]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
doc: Add `show ipv6 rpf X:X::X:X` command to docs
[mirror_frr.git] / tests / topotests / lib / common_config.py
1 #
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 import ipaddress
22 import json
23 import os
24 import platform
25 import socket
26 import subprocess
27 import sys
28 import traceback
29 import functools
30 from collections import OrderedDict
31 from copy import deepcopy
32 from datetime import datetime, timedelta
33 from functools import wraps
34 from re import search as re_search
35 from time import sleep
36
37 try:
38 # Imports from python2
39 import ConfigParser as configparser
40 except ImportError:
41 # Imports from python3
42 import configparser
43
44 from lib.micronet import comm_error
45 from lib.topogen import TopoRouter, get_topogen
46 from lib.topolog import get_logger, logger
47 from lib.topotest import frr_unicode, interface_set_status, version_cmp
48 from lib import topotest
49
50 FRRCFG_FILE = "frr_json.conf"
51 FRRCFG_BKUP_FILE = "frr_json_initial.conf"
52
53 ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
54
55 ####
56 CD = os.path.dirname(os.path.realpath(__file__))
57 PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
58
59 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
60 # in `pytest.ini`.
61 config = configparser.ConfigParser()
62 config.read(PYTESTINI_PATH)
63
64 config_section = "topogen"
65
66 # Debug logs for daemons
67 DEBUG_LOGS = {
68 "pimd": [
69 "debug msdp events",
70 "debug msdp packets",
71 "debug igmp events",
72 "debug igmp trace",
73 "debug mroute",
74 "debug mroute detail",
75 "debug pim events",
76 "debug pim packets",
77 "debug pim trace",
78 "debug pim zebra",
79 "debug pim bsm",
80 "debug pim packets joins",
81 "debug pim packets register",
82 "debug pim nht",
83 ],
84 "pim6d": [
85 "debug pimv6 events",
86 "debug pimv6 packets",
87 "debug pimv6 packet-dump send",
88 "debug pimv6 packet-dump receive",
89 "debug pimv6 trace",
90 "debug pimv6 trace detail",
91 "debug pimv6 zebra",
92 "debug pimv6 bsm",
93 "debug pimv6 packets hello",
94 "debug pimv6 packets joins",
95 "debug pimv6 packets register",
96 "debug pimv6 nht",
97 "debug pimv6 nht detail",
98 "debug mroute6",
99 "debug mroute6 detail",
100 "debug mld events",
101 "debug mld packets",
102 "debug mld trace",
103 ],
104 "bgpd": [
105 "debug bgp neighbor-events",
106 "debug bgp updates",
107 "debug bgp zebra",
108 "debug bgp nht",
109 "debug bgp neighbor-events",
110 "debug bgp graceful-restart",
111 "debug bgp update-groups",
112 "debug bgp vpn leak-from-vrf",
113 "debug bgp vpn leak-to-vrf",
114 "debug bgp zebr",
115 "debug bgp updates",
116 "debug bgp nht",
117 "debug bgp neighbor-events",
118 "debug vrf",
119 ],
120 "zebra": [
121 "debug zebra events",
122 "debug zebra rib",
123 "debug zebra vxlan",
124 "debug zebra nht",
125 ],
126 "ospf": [
127 "debug ospf event",
128 "debug ospf ism",
129 "debug ospf lsa",
130 "debug ospf nsm",
131 "debug ospf nssa",
132 "debug ospf packet all",
133 "debug ospf sr",
134 "debug ospf te",
135 "debug ospf zebra",
136 ],
137 "ospf6": [
138 "debug ospf6 event",
139 "debug ospf6 ism",
140 "debug ospf6 lsa",
141 "debug ospf6 nsm",
142 "debug ospf6 nssa",
143 "debug ospf6 packet all",
144 "debug ospf6 sr",
145 "debug ospf6 te",
146 "debug ospf6 zebra",
147 ],
148 }
149
150 g_iperf_client_procs = {}
151 g_iperf_server_procs = {}
152
153
154 def is_string(value):
155 try:
156 return isinstance(value, basestring)
157 except NameError:
158 return isinstance(value, str)
159
160
161 if config.has_option("topogen", "verbosity"):
162 loglevel = config.get("topogen", "verbosity")
163 loglevel = loglevel.lower()
164 else:
165 loglevel = "info"
166
167 if config.has_option("topogen", "frrtest_log_dir"):
168 frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
169 time_stamp = datetime.time(datetime.now())
170 logfile_name = "frr_test_bgp_"
171 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
172 print("frrtest_log_file..", frrtest_log_file)
173
174 logger = get_logger(
175 "test_execution_logs", log_level=loglevel, target=frrtest_log_file
176 )
177 print("Logs will be sent to logfile: {}".format(frrtest_log_file))
178
179 if config.has_option("topogen", "show_router_config"):
180 show_router_config = config.get("topogen", "show_router_config")
181 else:
182 show_router_config = False
183
184 # env variable for setting what address type to test
185 ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
186
187
188 # Saves sequence id numbers
189 SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
190
191
192 def get_seq_id(obj_type, router, obj_name):
193 """
194 Generates and saves sequence number in interval of 10
195 Parameters
196 ----------
197 * `obj_type`: prefix_lists or route_maps
198 * `router`: router name
199 *` obj_name`: name of the prefix-list or route-map
200 Returns
201 --------
202 Sequence number generated
203 """
204
205 router_data = SEQ_ID[obj_type].setdefault(router, {})
206 obj_data = router_data.setdefault(obj_name, {})
207 seq_id = obj_data.setdefault("seq_id", 0)
208
209 seq_id = int(seq_id) + 10
210 obj_data["seq_id"] = seq_id
211
212 return seq_id
213
214
215 def set_seq_id(obj_type, router, id, obj_name):
216 """
217 Saves sequence number if not auto-generated and given by user
218 Parameters
219 ----------
220 * `obj_type`: prefix_lists or route_maps
221 * `router`: router name
222 *` obj_name`: name of the prefix-list or route-map
223 """
224 router_data = SEQ_ID[obj_type].setdefault(router, {})
225 obj_data = router_data.setdefault(obj_name, {})
226 seq_id = obj_data.setdefault("seq_id", 0)
227
228 seq_id = int(seq_id) + int(id)
229 obj_data["seq_id"] = seq_id
230
231
232 class InvalidCLIError(Exception):
233 """Raise when the CLI command is wrong"""
234
235
236 def run_frr_cmd(rnode, cmd, isjson=False):
237 """
238 Execute frr show commands in privileged mode
239 * `rnode`: router node on which command needs to be executed
240 * `cmd`: Command to be executed on frr
241 * `isjson`: If command is to get json data or not
242 :return str:
243 """
244
245 if cmd:
246 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
247
248 if isjson:
249 rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
250
251 return ret_data
252
253 else:
254 raise InvalidCLIError("No actual cmd passed")
255
256
257 def apply_raw_config(tgen, input_dict):
258
259 """
260 API to configure raw configuration on device. This can be used for any cli
261 which has not been implemented in JSON.
262
263 Parameters
264 ----------
265 * `tgen`: tgen object
266 * `input_dict`: configuration that needs to be applied
267
268 Usage
269 -----
270 input_dict = {
271 "r2": {
272 "raw_config": [
273 "router bgp",
274 "no bgp update-group-split-horizon"
275 ]
276 }
277 }
278 Returns
279 -------
280 True or errormsg
281 """
282
283 rlist = []
284
285 for router_name in input_dict.keys():
286 config_cmd = input_dict[router_name]["raw_config"]
287
288 if not isinstance(config_cmd, list):
289 config_cmd = [config_cmd]
290
291 frr_cfg_file = "{}/{}/{}".format(tgen.logdir, router_name, FRRCFG_FILE)
292 with open(frr_cfg_file, "w") as cfg:
293 for cmd in config_cmd:
294 cfg.write("{}\n".format(cmd))
295
296 rlist.append(router_name)
297
298 # Load config on all routers
299 return load_config_to_routers(tgen, rlist)
300
301
302 def create_common_configurations(
303 tgen, config_dict, config_type=None, build=False, load_config=True
304 ):
305 """
306 API to create object of class FRRConfig and also create frr_json.conf
307 file. It will create interface and common configurations and save it to
308 frr_json.conf and load to router
309 Parameters
310 ----------
311 * `tgen`: tgen object
312 * `config_dict`: Configuration data saved in a dict of { router: config-list }
313 * `routers` : list of router id to be configured.
314 * `config_type` : Syntactic information while writing configuration. Should
315 be one of the value as mentioned in the config_map below.
316 * `build` : Only for initial setup phase this is set as True
317 Returns
318 -------
319 True or False
320 """
321
322 config_map = OrderedDict(
323 {
324 "general_config": "! FRR General Config\n",
325 "debug_log_config": "! Debug log Config\n",
326 "interface_config": "! Interfaces Config\n",
327 "static_route": "! Static Route Config\n",
328 "prefix_list": "! Prefix List Config\n",
329 "bgp_community_list": "! Community List Config\n",
330 "route_maps": "! Route Maps Config\n",
331 "bgp": "! BGP Config\n",
332 "vrf": "! VRF Config\n",
333 "ospf": "! OSPF Config\n",
334 "ospf6": "! OSPF Config\n",
335 "pim": "! PIM Config\n",
336 }
337 )
338
339 if build:
340 mode = "a"
341 elif not load_config:
342 mode = "a"
343 else:
344 mode = "w"
345
346 routers = config_dict.keys()
347 for router in routers:
348 fname = "{}/{}/{}".format(tgen.logdir, router, FRRCFG_FILE)
349 try:
350 frr_cfg_fd = open(fname, mode)
351 if config_type:
352 frr_cfg_fd.write(config_map[config_type])
353 for line in config_dict[router]:
354 frr_cfg_fd.write("{} \n".format(str(line)))
355 frr_cfg_fd.write("\n")
356
357 except IOError as err:
358 logger.error("Unable to open FRR Config '%s': %s" % (fname, str(err)))
359 return False
360 finally:
361 frr_cfg_fd.close()
362
363 # If configuration applied from build, it will done at last
364 result = True
365 if not build and load_config:
366 result = load_config_to_routers(tgen, routers)
367
368 return result
369
370
371 def create_common_configuration(
372 tgen, router, data, config_type=None, build=False, load_config=True
373 ):
374 """
375 API to create object of class FRRConfig and also create frr_json.conf
376 file. It will create interface and common configurations and save it to
377 frr_json.conf and load to router
378 Parameters
379 ----------
380 * `tgen`: tgen object
381 * `data`: Configuration data saved in a list.
382 * `router` : router id to be configured.
383 * `config_type` : Syntactic information while writing configuration. Should
384 be one of the value as mentioned in the config_map below.
385 * `build` : Only for initial setup phase this is set as True
386 Returns
387 -------
388 True or False
389 """
390 return create_common_configurations(
391 tgen, {router: data}, config_type, build, load_config
392 )
393
394
395 def kill_router_daemons(tgen, router, daemons, save_config=True):
396 """
397 Router's current config would be saved to /etc/frr/ for each daemon
398 and daemon would be killed forcefully using SIGKILL.
399 * `tgen` : topogen object
400 * `router`: Device under test
401 * `daemons`: list of daemons to be killed
402 """
403
404 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
405
406 try:
407 router_list = tgen.routers()
408
409 if save_config:
410 # Saving router config to /etc/frr, which will be loaded to router
411 # when it starts
412 router_list[router].vtysh_cmd("write memory")
413
414 # Kill Daemons
415 result = router_list[router].killDaemons(daemons)
416 if len(result) > 0:
417 assert "Errors found post shutdown - details follow:" == 0, result
418 return result
419
420 except Exception as e:
421 errormsg = traceback.format_exc()
422 logger.error(errormsg)
423 return errormsg
424
425
426 def start_router_daemons(tgen, router, daemons):
427 """
428 Daemons defined by user would be started
429 * `tgen` : topogen object
430 * `router`: Device under test
431 * `daemons`: list of daemons to be killed
432 """
433
434 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
435
436 try:
437 router_list = tgen.routers()
438
439 # Start daemons
440 res = router_list[router].startDaemons(daemons)
441
442 except Exception as e:
443 errormsg = traceback.format_exc()
444 logger.error(errormsg)
445 res = errormsg
446
447 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
448 return res
449
450
451 def check_router_status(tgen):
452 """
453 Check if all daemons are running for all routers in topology
454 * `tgen` : topogen object
455 """
456
457 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
458
459 try:
460 router_list = tgen.routers()
461 for router, rnode in router_list.items():
462
463 result = rnode.check_router_running()
464 if result != "":
465 daemons = []
466 if "bgpd" in result:
467 daemons.append("bgpd")
468 if "zebra" in result:
469 daemons.append("zebra")
470 if "pimd" in result:
471 daemons.append("pimd")
472 if "pim6d" in result:
473 daemons.append("pim6d")
474 if "ospfd" in result:
475 daemons.append("ospfd")
476 if "ospf6d" in result:
477 daemons.append("ospf6d")
478 rnode.startDaemons(daemons)
479
480 except Exception as e:
481 errormsg = traceback.format_exc()
482 logger.error(errormsg)
483 return errormsg
484
485 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
486 return True
487
488
489 def save_initial_config_on_routers(tgen):
490 """Save current configuration on routers to FRRCFG_BKUP_FILE.
491
492 FRRCFG_BKUP_FILE is the file that will be restored when `reset_config_on_routers()`
493 is called.
494
495 Parameters
496 ----------
497 * `tgen` : Topogen object
498 """
499 router_list = tgen.routers()
500 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
501
502 # Get all running configs in parallel
503 procs = {}
504 for rname in router_list:
505 logger.info("Fetching running config for router %s", rname)
506 procs[rname] = router_list[rname].popen(
507 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
508 stdin=None,
509 stdout=open(target_cfg_fmt.format(rname), "w"),
510 stderr=subprocess.PIPE,
511 )
512 for rname, p in procs.items():
513 _, error = p.communicate()
514 if p.returncode:
515 logger.error(
516 "Get running config for %s failed %d: %s", rname, p.returncode, error
517 )
518 raise InvalidCLIError(
519 "vtysh show running error on {}: {}".format(rname, error)
520 )
521
522
523 def reset_config_on_routers(tgen, routerName=None):
524 """
525 Resets configuration on routers to the snapshot created using input JSON
526 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
527
528 Parameters
529 ----------
530 * `tgen` : Topogen object
531 * `routerName` : router config is to be reset
532 """
533
534 logger.debug("Entering API: reset_config_on_routers")
535
536 tgen.cfg_gen += 1
537 gen = tgen.cfg_gen
538
539 # Trim the router list if needed
540 router_list = tgen.routers()
541 if routerName:
542 if routerName not in router_list:
543 logger.warning(
544 "Exiting API: reset_config_on_routers: no router %s",
545 routerName,
546 exc_info=True,
547 )
548 return True
549 router_list = {routerName: router_list[routerName]}
550
551 delta_fmt = tgen.logdir + "/{}/delta-{}.conf"
552 # FRRCFG_BKUP_FILE
553 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
554 run_cfg_fmt = tgen.logdir + "/{}/frr-{}.sav"
555
556 #
557 # Get all running configs in parallel
558 #
559 procs = {}
560 for rname in router_list:
561 logger.info("Fetching running config for router %s", rname)
562 procs[rname] = router_list[rname].popen(
563 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
564 stdin=None,
565 stdout=open(run_cfg_fmt.format(rname, gen), "w"),
566 stderr=subprocess.PIPE,
567 )
568 for rname, p in procs.items():
569 _, error = p.communicate()
570 if p.returncode:
571 logger.error(
572 "Get running config for %s failed %d: %s", rname, p.returncode, error
573 )
574 raise InvalidCLIError(
575 "vtysh show running error on {}: {}".format(rname, error)
576 )
577
578 #
579 # Get all delta's in parallel
580 #
581 procs = {}
582 for rname in router_list:
583 logger.info(
584 "Generating delta for router %s to new configuration (gen %d)", rname, gen
585 )
586 procs[rname] = tgen.net.popen(
587 [
588 "/usr/lib/frr/frr-reload.py",
589 "--test-reset",
590 "--input",
591 run_cfg_fmt.format(rname, gen),
592 "--test",
593 target_cfg_fmt.format(rname),
594 ],
595 stdin=None,
596 stdout=open(delta_fmt.format(rname, gen), "w"),
597 stderr=subprocess.PIPE,
598 )
599 for rname, p in procs.items():
600 _, error = p.communicate()
601 if p.returncode:
602 logger.error(
603 "Delta file creation for %s failed %d: %s", rname, p.returncode, error
604 )
605 raise InvalidCLIError("frr-reload error for {}: {}".format(rname, error))
606
607 #
608 # Apply all the deltas in parallel
609 #
610 procs = {}
611 for rname in router_list:
612 logger.info("Applying delta config on router %s", rname)
613
614 procs[rname] = router_list[rname].popen(
615 ["/usr/bin/env", "vtysh", "-f", delta_fmt.format(rname, gen)],
616 stdin=None,
617 stdout=subprocess.PIPE,
618 stderr=subprocess.STDOUT,
619 )
620 for rname, p in procs.items():
621 output, _ = p.communicate()
622 vtysh_command = "vtysh -f {}".format(delta_fmt.format(rname, gen))
623 if not p.returncode:
624 router_list[rname].logger.info(
625 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
626 vtysh_command, output
627 )
628 )
629 else:
630 router_list[rname].logger.warning(
631 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
632 vtysh_command, output
633 )
634 )
635 logger.error(
636 "Delta file apply for %s failed %d: %s", rname, p.returncode, output
637 )
638
639 # We really need to enable this failure; however, currently frr-reload.py
640 # producing invalid "no" commands as it just preprends "no", but some of the
641 # command forms lack matching values (e.g., final values). Until frr-reload
642 # is fixed to handle this (or all the CLI no forms are adjusted) we can't
643 # fail tests.
644 # raise InvalidCLIError("frr-reload error for {}: {}".format(rname, output))
645
646 #
647 # Optionally log all new running config if "show_router_config" is defined in
648 # "pytest.ini"
649 #
650 if show_router_config:
651 procs = {}
652 for rname in router_list:
653 logger.info("Fetching running config for router %s", rname)
654 procs[rname] = router_list[rname].popen(
655 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
656 stdin=None,
657 stdout=subprocess.PIPE,
658 stderr=subprocess.STDOUT,
659 )
660 for rname, p in procs.items():
661 output, _ = p.communicate()
662 if p.returncode:
663 logger.warning(
664 "Get running config for %s failed %d: %s",
665 rname,
666 p.returncode,
667 output,
668 )
669 else:
670 logger.info(
671 "Configuration on router %s after reset:\n%s", rname, output
672 )
673
674 logger.debug("Exiting API: reset_config_on_routers")
675 return True
676
677
678 def prep_load_config_to_routers(tgen, *config_name_list):
679 """Create common config for `load_config_to_routers`.
680
681 The common config file is constructed from the list of sub-config files passed as
682 position arguments to this function. Each entry in `config_name_list` is looked for
683 under the router sub-directory in the test directory and those files are
684 concatenated together to create the common config. e.g.,
685
686 # Routers are "r1" and "r2", test file is `example/test_example_foo.py`
687 prepare_load_config_to_routers(tgen, "bgpd.conf", "ospfd.conf")
688
689 When the above call is made the files in
690
691 example/r1/bgpd.conf
692 example/r1/ospfd.conf
693
694 Are concat'd together into a single config file that will be loaded on r1, and
695
696 example/r2/bgpd.conf
697 example/r2/ospfd.conf
698
699 Are concat'd together into a single config file that will be loaded on r2 when
700 the call to `load_config_to_routers` is made.
701 """
702
703 routers = tgen.routers()
704 for rname, router in routers.items():
705 destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_FILE)
706 wmode = "w"
707 for cfbase in config_name_list:
708 script_dir = os.environ["PYTEST_TOPOTEST_SCRIPTDIR"]
709 confname = os.path.join(script_dir, "{}/{}".format(rname, cfbase))
710 with open(confname, "r") as cf:
711 with open(destname, wmode) as df:
712 df.write(cf.read())
713 wmode = "a"
714
715
716 def load_config_to_routers(tgen, routers, save_bkup=False):
717 """
718 Loads configuration on routers from the file FRRCFG_FILE.
719
720 Parameters
721 ----------
722 * `tgen` : Topogen object
723 * `routers` : routers for which configuration is to be loaded
724 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
725 Returns
726 -------
727 True or False
728 """
729
730 logger.debug("Entering API: load_config_to_routers")
731
732 tgen.cfg_gen += 1
733 gen = tgen.cfg_gen
734
735 base_router_list = tgen.routers()
736 router_list = {}
737 for router in routers:
738 if router not in base_router_list:
739 continue
740 router_list[router] = base_router_list[router]
741
742 frr_cfg_file_fmt = tgen.logdir + "/{}/" + FRRCFG_FILE
743 frr_cfg_save_file_fmt = tgen.logdir + "/{}/{}-" + FRRCFG_FILE
744 frr_cfg_bkup_fmt = tgen.logdir + "/{}/" + FRRCFG_BKUP_FILE
745
746 procs = {}
747 for rname in router_list:
748 router = router_list[rname]
749 try:
750 frr_cfg_file = frr_cfg_file_fmt.format(rname)
751 frr_cfg_save_file = frr_cfg_save_file_fmt.format(rname, gen)
752 frr_cfg_bkup = frr_cfg_bkup_fmt.format(rname)
753 with open(frr_cfg_file, "r+") as cfg:
754 data = cfg.read()
755 logger.info(
756 "Applying following configuration on router %s (gen: %d):\n%s",
757 rname,
758 gen,
759 data,
760 )
761 # Always save a copy of what we just did
762 with open(frr_cfg_save_file, "w") as bkup:
763 bkup.write(data)
764 if save_bkup:
765 with open(frr_cfg_bkup, "w") as bkup:
766 bkup.write(data)
767 procs[rname] = router_list[rname].popen(
768 ["/usr/bin/env", "vtysh", "-f", frr_cfg_file],
769 stdin=None,
770 stdout=subprocess.PIPE,
771 stderr=subprocess.STDOUT,
772 )
773 except IOError as err:
774 logger.error(
775 "Unable to open config File. error(%s): %s", err.errno, err.strerror
776 )
777 return False
778 except Exception as error:
779 logger.error("Unable to apply config on %s: %s", rname, str(error))
780 return False
781
782 errors = []
783 for rname, p in procs.items():
784 output, _ = p.communicate()
785 frr_cfg_file = frr_cfg_file_fmt.format(rname)
786 vtysh_command = "vtysh -f " + frr_cfg_file
787 if not p.returncode:
788 router_list[rname].logger.info(
789 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
790 vtysh_command, output
791 )
792 )
793 else:
794 router_list[rname].logger.error(
795 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
796 vtysh_command, output
797 )
798 )
799 logger.error(
800 "Config apply for %s failed %d: %s", rname, p.returncode, output
801 )
802 # We can't thorw an exception here as we won't clear the config file.
803 errors.append(
804 InvalidCLIError(
805 "load_config_to_routers error for {}: {}".format(rname, output)
806 )
807 )
808
809 # Empty the config file or we append to it next time through.
810 with open(frr_cfg_file, "r+") as cfg:
811 cfg.truncate(0)
812
813 # Router current configuration to log file or console if
814 # "show_router_config" is defined in "pytest.ini"
815 if show_router_config:
816 procs = {}
817 for rname in router_list:
818 procs[rname] = router_list[rname].popen(
819 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
820 stdin=None,
821 stdout=subprocess.PIPE,
822 stderr=subprocess.STDOUT,
823 )
824 for rname, p in procs.items():
825 output, _ = p.communicate()
826 if p.returncode:
827 logger.warning(
828 "Get running config for %s failed %d: %s",
829 rname,
830 p.returncode,
831 output,
832 )
833 else:
834 logger.info("New configuration for router %s:\n%s", rname, output)
835
836 logger.debug("Exiting API: load_config_to_routers")
837 return not errors
838
839
840 def load_config_to_router(tgen, routerName, save_bkup=False):
841 """
842 Loads configuration on router from the file FRRCFG_FILE.
843
844 Parameters
845 ----------
846 * `tgen` : Topogen object
847 * `routerName` : router for which configuration to be loaded
848 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
849 """
850 return load_config_to_routers(tgen, [routerName], save_bkup)
851
852
853 def reset_with_new_configs(tgen, *cflist):
854 """Reset the router to initial config, then load new configs.
855
856 Resets routers to the initial config state (see `save_initial_config_on_routers()
857 and `reset_config_on_routers()` `), then concat list of router sub-configs together
858 and load onto the routers (see `prep_load_config_to_routers()` and
859 `load_config_to_routers()`)
860 """
861 routers = tgen.routers()
862
863 reset_config_on_routers(tgen)
864 prep_load_config_to_routers(tgen, *cflist)
865 load_config_to_routers(tgen, tgen.routers(), save_bkup=False)
866
867
868 def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
869 """
870 API to get the link local ipv6 address of a particular interface using
871 FRR command 'show interface'
872
873 * `tgen`: tgen object
874 * `router` : router for which highest interface should be
875 calculated
876 * `intf` : interface for which link-local address needs to be taken
877 * `vrf` : VRF name
878
879 Usage
880 -----
881 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
882
883 Returns
884 -------
885 1) array of interface names to link local ips.
886 """
887
888 router_list = tgen.routers()
889 for rname, rnode in router_list.items():
890 if rname != router:
891 continue
892
893 linklocal = []
894
895 if vrf:
896 cmd = "show interface vrf {}".format(vrf)
897 else:
898 cmd = "show interface"
899
900 linklocal = []
901 if vrf:
902 cmd = "show interface vrf {}".format(vrf)
903 else:
904 cmd = "show interface"
905 for chk_ll in range(0, 60):
906 sleep(1 / 4)
907 ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
908 # Fix newlines (make them all the same)
909 ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines()
910
911 interface = None
912 ll_per_if_count = 0
913 for line in ifaces:
914 # Interface name
915 m = re_search("Interface ([a-zA-Z0-9-]+) is", line)
916 if m:
917 interface = m.group(1).split(" ")[0]
918 ll_per_if_count = 0
919
920 # Interface ip
921 m1 = re_search("inet6 (fe80[:a-fA-F0-9]+/[0-9]+)", line)
922 if m1:
923 local = m1.group(1)
924 ll_per_if_count += 1
925 if ll_per_if_count > 1:
926 linklocal += [["%s-%s" % (interface, ll_per_if_count), local]]
927 else:
928 linklocal += [[interface, local]]
929
930 try:
931 if linklocal:
932 if intf:
933 return [
934 _linklocal[1]
935 for _linklocal in linklocal
936 if _linklocal[0] == intf
937 ][0].split("/")[0]
938 return linklocal
939 except IndexError:
940 continue
941
942 errormsg = "Link local ip missing on router {}".format(router)
943 return errormsg
944
945
946 def generate_support_bundle():
947 """
948 API to generate support bundle on any verification ste failure.
949 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
950 which basically runs defined CLIs and dumps the data to specified location
951 """
952
953 tgen = get_topogen()
954 router_list = tgen.routers()
955 test_name = os.environ.get("PYTEST_CURRENT_TEST").split(":")[-1].split(" ")[0]
956
957 bundle_procs = {}
958 for rname, rnode in router_list.items():
959 logger.info("Spawn collection of support bundle for %s", rname)
960 dst_bundle = "{}/{}/support_bundles/{}".format(tgen.logdir, rname, test_name)
961 rnode.run("mkdir -p " + dst_bundle)
962
963 gen_sup_cmd = [
964 "/usr/lib/frr/generate_support_bundle.py",
965 "--log-dir=" + dst_bundle,
966 ]
967 bundle_procs[rname] = tgen.net[rname].popen(gen_sup_cmd, stdin=None)
968
969 for rname, rnode in router_list.items():
970 logger.info("Waiting on support bundle for %s", rname)
971 output, error = bundle_procs[rname].communicate()
972 if output:
973 logger.info(
974 "Output from collecting support bundle for %s:\n%s", rname, output
975 )
976 if error:
977 logger.warning(
978 "Error from collecting support bundle for %s:\n%s", rname, error
979 )
980
981 return True
982
983
984 def start_topology(tgen):
985 """
986 Starting topology, create tmp files which are loaded to routers
987 to start daemons and then start routers
988 * `tgen` : topogen object
989 """
990
991 # Starting topology
992 tgen.start_topology()
993
994 # Starting daemons
995
996 router_list = tgen.routers()
997 routers_sorted = sorted(
998 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
999 )
1000
1001 linux_ver = ""
1002 router_list = tgen.routers()
1003 for rname in routers_sorted:
1004 router = router_list[rname]
1005
1006 # It will help in debugging the failures, will give more details on which
1007 # specific kernel version tests are failing
1008 if linux_ver == "":
1009 linux_ver = router.run("uname -a")
1010 logger.info("Logging platform related details: \n %s \n", linux_ver)
1011
1012 try:
1013 os.chdir(tgen.logdir)
1014
1015 # # Creating router named dir and empty zebra.conf bgpd.conf files
1016 # # inside the current directory
1017 # if os.path.isdir("{}".format(rname)):
1018 # os.system("rm -rf {}".format(rname))
1019 # os.mkdir("{}".format(rname))
1020 # os.system("chmod -R go+rw {}".format(rname))
1021 # os.chdir("{}/{}".format(tgen.logdir, rname))
1022 # os.system("touch zebra.conf bgpd.conf")
1023 # else:
1024 # os.mkdir("{}".format(rname))
1025 # os.system("chmod -R go+rw {}".format(rname))
1026 # os.chdir("{}/{}".format(tgen.logdir, rname))
1027 # os.system("touch zebra.conf bgpd.conf")
1028
1029 except IOError as err:
1030 logger.error("I/O error({0}): {1}".format(err.errno, err.strerror))
1031
1032 topo = tgen.json_topo
1033 feature = set()
1034
1035 if "feature" in topo:
1036 feature.update(topo["feature"])
1037
1038 if rname in topo["routers"]:
1039 for key in topo["routers"][rname].keys():
1040 feature.add(key)
1041
1042 for val in topo["routers"][rname]["links"].values():
1043 if "pim" in val:
1044 feature.add("pim")
1045 break
1046 for val in topo["routers"][rname]["links"].values():
1047 if "pim6" in val:
1048 feature.add("pim6")
1049 break
1050 for val in topo["routers"][rname]["links"].values():
1051 if "ospf6" in val:
1052 feature.add("ospf6")
1053 break
1054 if "switches" in topo and rname in topo["switches"]:
1055 for val in topo["switches"][rname]["links"].values():
1056 if "ospf" in val:
1057 feature.add("ospf")
1058 break
1059 if "ospf6" in val:
1060 feature.add("ospf6")
1061 break
1062
1063 # Loading empty zebra.conf file to router, to start the zebra deamon
1064 router.load_config(
1065 TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(tgen.logdir, rname)
1066 )
1067
1068 # Loading empty bgpd.conf file to router, to start the bgp deamon
1069 if "bgp" in feature:
1070 router.load_config(
1071 TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(tgen.logdir, rname)
1072 )
1073
1074 # Loading empty pimd.conf file to router, to start the pim deamon
1075 if "pim" in feature:
1076 router.load_config(
1077 TopoRouter.RD_PIM, "{}/{}/pimd.conf".format(tgen.logdir, rname)
1078 )
1079
1080 # Loading empty pimd.conf file to router, to start the pim deamon
1081 if "pim6" in feature:
1082 router.load_config(
1083 TopoRouter.RD_PIM6, "{}/{}/pim6d.conf".format(tgen.logdir, rname)
1084 )
1085
1086 if "ospf" in feature:
1087 # Loading empty ospf.conf file to router, to start the ospf deamon
1088 router.load_config(
1089 TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(tgen.logdir, rname)
1090 )
1091
1092 if "ospf6" in feature:
1093 # Loading empty ospf.conf file to router, to start the ospf deamon
1094 router.load_config(
1095 TopoRouter.RD_OSPF6, "{}/{}/ospf6d.conf".format(tgen.logdir, rname)
1096 )
1097
1098 # Starting routers
1099 logger.info("Starting all routers once topology is created")
1100 tgen.start_router()
1101
1102
1103 def stop_router(tgen, router):
1104 """
1105 Router"s current config would be saved to /tmp/topotest/<suite>/<router> for each daemon
1106 and router and its daemons would be stopped.
1107
1108 * `tgen` : topogen object
1109 * `router`: Device under test
1110 """
1111
1112 router_list = tgen.routers()
1113
1114 # Saving router config to /etc/frr, which will be loaded to router
1115 # when it starts
1116 router_list[router].vtysh_cmd("write memory")
1117
1118 # Stop router
1119 router_list[router].stop()
1120
1121
1122 def start_router(tgen, router):
1123 """
1124 Router will be started and config would be loaded from /tmp/topotest/<suite>/<router> for each
1125 daemon
1126
1127 * `tgen` : topogen object
1128 * `router`: Device under test
1129 """
1130
1131 logger.debug("Entering lib API: start_router")
1132
1133 try:
1134 router_list = tgen.routers()
1135
1136 # Router and its daemons would be started and config would
1137 # be loaded to router for each daemon from /etc/frr
1138 router_list[router].start()
1139
1140 # Waiting for router to come up
1141 sleep(5)
1142
1143 except Exception as e:
1144 errormsg = traceback.format_exc()
1145 logger.error(errormsg)
1146 return errormsg
1147
1148 logger.debug("Exiting lib API: start_router()")
1149 return True
1150
1151
1152 def number_to_row(routerName):
1153 """
1154 Returns the number for the router.
1155 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
1156 etc
1157 """
1158 return int(routerName[1:])
1159
1160
1161 def number_to_column(routerName):
1162 """
1163 Returns the number for the router.
1164 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
1165 z23 = column 26 etc
1166 """
1167 return ord(routerName[0]) - 97
1168
1169
1170 def topo_daemons(tgen, topo=None):
1171 """
1172 Returns daemon list required for the suite based on topojson.
1173 """
1174 daemon_list = []
1175
1176 if topo is None:
1177 topo = tgen.json_topo
1178
1179 router_list = tgen.routers()
1180 routers_sorted = sorted(
1181 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
1182 )
1183
1184 for rtr in routers_sorted:
1185 if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list:
1186 daemon_list.append("ospfd")
1187
1188 if "ospf6" in topo["routers"][rtr] and "ospf6d" not in daemon_list:
1189 daemon_list.append("ospf6d")
1190
1191 for val in topo["routers"][rtr]["links"].values():
1192 if "pim" in val and "pimd" not in daemon_list:
1193 daemon_list.append("pimd")
1194 if "pim6" in val and "pim6d" not in daemon_list:
1195 daemon_list.append("pim6d")
1196 if "ospf" in val and "ospfd" not in daemon_list:
1197 daemon_list.append("ospfd")
1198 if "ospf6" in val and "ospf6d" not in daemon_list:
1199 daemon_list.append("ospf6d")
1200 break
1201
1202 return daemon_list
1203
1204
1205 def add_interfaces_to_vlan(tgen, input_dict):
1206 """
1207 Add interfaces to VLAN, we need vlan pakcage to be installed on machine
1208
1209 * `tgen`: tgen onject
1210 * `input_dict` : interfaces to be added to vlans
1211
1212 input_dict= {
1213 "r1":{
1214 "vlan":{
1215 VLAN_1: [{
1216 intf_r1_s1: {
1217 "ip": "10.1.1.1",
1218 "subnet": "255.255.255.0
1219 }
1220 }]
1221 }
1222 }
1223 }
1224
1225 add_interfaces_to_vlan(tgen, input_dict)
1226
1227 """
1228
1229 router_list = tgen.routers()
1230 for dut in input_dict.keys():
1231 rnode = router_list[dut]
1232
1233 if "vlan" in input_dict[dut]:
1234 for vlan, interfaces in input_dict[dut]["vlan"].items():
1235 for intf_dict in interfaces:
1236 for interface, data in intf_dict.items():
1237 # Adding interface to VLAN
1238 vlan_intf = "{}.{}".format(interface, vlan)
1239 cmd = "ip link add link {} name {} type vlan id {}".format(
1240 interface, vlan_intf, vlan
1241 )
1242 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1243 result = rnode.run(cmd)
1244 logger.info("result %s", result)
1245
1246 # Bringing interface up
1247 cmd = "ip link set {} up".format(vlan_intf)
1248 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1249 result = rnode.run(cmd)
1250 logger.info("result %s", result)
1251
1252 # Assigning IP address
1253 ifaddr = ipaddress.ip_interface(
1254 "{}/{}".format(
1255 frr_unicode(data["ip"]), frr_unicode(data["subnet"])
1256 )
1257 )
1258
1259 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1260 ifaddr.version, vlan_intf, ifaddr
1261 )
1262 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1263 result = rnode.run(cmd)
1264 logger.info("result %s", result)
1265
1266
1267 def tcpdump_capture_start(
1268 tgen,
1269 router,
1270 intf,
1271 protocol=None,
1272 grepstr=None,
1273 timeout=0,
1274 options=None,
1275 cap_file=None,
1276 background=True,
1277 ):
1278 """
1279 API to capture network packets using tcp dump.
1280
1281 Packages used :
1282
1283 Parameters
1284 ----------
1285 * `tgen`: topogen object.
1286 * `router`: router on which ping has to be performed.
1287 * `intf` : interface for capture.
1288 * `protocol` : protocol for which packet needs to be captured.
1289 * `grepstr` : string to filter out tcp dump output.
1290 * `timeout` : Time for which packet needs to be captured.
1291 * `options` : options for TCP dump, all tcpdump options can be used.
1292 * `cap_file` : filename to store capture dump.
1293 * `background` : Make tcp dump run in back ground.
1294
1295 Usage
1296 -----
1297 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1298 options='-A -vv -x > r2bgp.txt ')
1299 Returns
1300 -------
1301 1) True for successful capture
1302 2) errormsg - when tcp dump fails
1303 """
1304
1305 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1306
1307 rnode = tgen.gears[router]
1308
1309 if timeout > 0:
1310 cmd = "timeout {}".format(timeout)
1311 else:
1312 cmd = ""
1313
1314 cmdargs = "{} tcpdump".format(cmd)
1315
1316 if intf:
1317 cmdargs += " -i {}".format(str(intf))
1318 if protocol:
1319 cmdargs += " {}".format(str(protocol))
1320 if options:
1321 cmdargs += " -s 0 {}".format(str(options))
1322
1323 if cap_file:
1324 file_name = os.path.join(tgen.logdir, router, cap_file)
1325 cmdargs += " -w {}".format(str(file_name))
1326 # Remove existing capture file
1327 rnode.run("rm -rf {}".format(file_name))
1328
1329 if grepstr:
1330 cmdargs += ' | grep "{}"'.format(str(grepstr))
1331
1332 logger.info("Running tcpdump command: [%s]", cmdargs)
1333 if not background:
1334 rnode.run(cmdargs)
1335 else:
1336 # XXX this & is bogus doesn't work
1337 # rnode.run("nohup {} & /dev/null 2>&1".format(cmdargs))
1338 rnode.run("nohup {} > /dev/null 2>&1".format(cmdargs))
1339
1340 # Check if tcpdump process is running
1341 if background:
1342 result = rnode.run("pgrep tcpdump")
1343 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1344
1345 if not result:
1346 errormsg = "tcpdump is not running {}".format("tcpdump")
1347 return errormsg
1348 else:
1349 logger.info("Packet capture started on %s: interface %s", router, intf)
1350
1351 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1352 return True
1353
1354
1355 def tcpdump_capture_stop(tgen, router):
1356 """
1357 API to capture network packets using tcp dump.
1358
1359 Packages used :
1360
1361 Parameters
1362 ----------
1363 * `tgen`: topogen object.
1364 * `router`: router on which ping has to be performed.
1365 * `intf` : interface for capture.
1366 * `protocol` : protocol for which packet needs to be captured.
1367 * `grepstr` : string to filter out tcp dump output.
1368 * `timeout` : Time for which packet needs to be captured.
1369 * `options` : options for TCP dump, all tcpdump options can be used.
1370 * `cap2file` : filename to store capture dump.
1371 * `bakgrnd` : Make tcp dump run in back ground.
1372
1373 Usage
1374 -----
1375 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1376 options='-A -vv -x > r2bgp.txt ')
1377 Returns
1378 -------
1379 1) True for successful capture
1380 2) errormsg - when tcp dump fails
1381 """
1382
1383 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1384
1385 rnode = tgen.gears[router]
1386
1387 # Check if tcpdump process is running
1388 result = rnode.run("ps -ef | grep tcpdump")
1389 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1390
1391 if not re_search(r"{}".format("tcpdump"), result):
1392 errormsg = "tcpdump is not running {}".format("tcpdump")
1393 return errormsg
1394 else:
1395 # XXX this doesn't work with micronet
1396 ppid = tgen.net.nameToNode[rnode.name].pid
1397 rnode.run("set +m; pkill -P %s tcpdump &> /dev/null" % ppid)
1398 logger.info("Stopped tcpdump capture")
1399
1400 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1401 return True
1402
1403
1404 def create_debug_log_config(tgen, input_dict, build=False):
1405 """
1406 Enable/disable debug logs for any protocol with defined debug
1407 options and logs would be saved to created log file
1408
1409 Parameters
1410 ----------
1411 * `tgen` : Topogen object
1412 * `input_dict` : details to enable debug logs for protocols
1413 * `build` : Only for initial setup phase this is set as True.
1414
1415
1416 Usage:
1417 ------
1418 input_dict = {
1419 "r2": {
1420 "debug":{
1421 "log_file" : "debug.log",
1422 "enable": ["pimd", "zebra"],
1423 "disable": {
1424 "bgpd":[
1425 'debug bgp neighbor-events',
1426 'debug bgp updates',
1427 'debug bgp zebra',
1428 ]
1429 }
1430 }
1431 }
1432 }
1433
1434 result = create_debug_log_config(tgen, input_dict)
1435
1436 Returns
1437 -------
1438 True or False
1439 """
1440
1441 result = False
1442 try:
1443 debug_config_dict = {}
1444
1445 for router in input_dict.keys():
1446 debug_config = []
1447 if "debug" in input_dict[router]:
1448 debug_dict = input_dict[router]["debug"]
1449
1450 disable_logs = debug_dict.setdefault("disable", None)
1451 enable_logs = debug_dict.setdefault("enable", None)
1452 log_file = debug_dict.setdefault("log_file", None)
1453
1454 if log_file:
1455 _log_file = os.path.join(tgen.logdir, log_file)
1456 debug_config.append("log file {} \n".format(_log_file))
1457
1458 if type(enable_logs) is list:
1459 for daemon in enable_logs:
1460 for debug_log in DEBUG_LOGS[daemon]:
1461 debug_config.append("{}".format(debug_log))
1462 elif type(enable_logs) is dict:
1463 for daemon, debug_logs in enable_logs.items():
1464 for debug_log in debug_logs:
1465 debug_config.append("{}".format(debug_log))
1466
1467 if type(disable_logs) is list:
1468 for daemon in disable_logs:
1469 for debug_log in DEBUG_LOGS[daemon]:
1470 debug_config.append("no {}".format(debug_log))
1471 elif type(disable_logs) is dict:
1472 for daemon, debug_logs in disable_logs.items():
1473 for debug_log in debug_logs:
1474 debug_config.append("no {}".format(debug_log))
1475 if debug_config:
1476 debug_config_dict[router] = debug_config
1477
1478 result = create_common_configurations(
1479 tgen, debug_config_dict, "debug_log_config", build=build
1480 )
1481 except InvalidCLIError:
1482 # Traceback
1483 errormsg = traceback.format_exc()
1484 logger.error(errormsg)
1485 return errormsg
1486
1487 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1488 return result
1489
1490
1491 #############################################
1492 # Common APIs, will be used by all protocols
1493 #############################################
1494
1495
1496 def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
1497 """
1498 Create vrf configuration for created topology. VRF
1499 configuration is provided in input json file.
1500
1501 VRF config is done in Linux Kernel:
1502 * Create VRF
1503 * Attach interface to VRF
1504 * Bring up VRF
1505
1506 Parameters
1507 ----------
1508 * `tgen` : Topogen object
1509 * `topo` : json file data
1510 * `input_dict` : Input dict data, required when configuring
1511 from testcase
1512 * `build` : Only for initial setup phase this is set as True.
1513
1514 Usage
1515 -----
1516 input_dict={
1517 "r3": {
1518 "links": {
1519 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
1520 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
1521 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
1522 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
1523 },
1524 "vrfs":[
1525 {
1526 "name": "RED_A",
1527 "id": "1"
1528 },
1529 {
1530 "name": "RED_B",
1531 "id": "2"
1532 },
1533 {
1534 "name": "BLUE_A",
1535 "id": "3",
1536 "delete": True
1537 },
1538 {
1539 "name": "BLUE_B",
1540 "id": "4"
1541 }
1542 ]
1543 }
1544 }
1545 result = create_vrf_cfg(tgen, topo, input_dict)
1546
1547 Returns
1548 -------
1549 True or False
1550 """
1551 result = True
1552 if not input_dict:
1553 input_dict = deepcopy(topo)
1554 else:
1555 input_dict = deepcopy(input_dict)
1556
1557 try:
1558 config_data_dict = {}
1559
1560 for c_router, c_data in input_dict.items():
1561 rnode = tgen.gears[c_router]
1562 config_data = []
1563 if "vrfs" in c_data:
1564 for vrf in c_data["vrfs"]:
1565 name = vrf.setdefault("name", None)
1566 table_id = vrf.setdefault("id", None)
1567 del_action = vrf.setdefault("delete", False)
1568
1569 if del_action:
1570 # Kernel cmd- Add VRF and table
1571 cmd = "ip link del {} type vrf table {}".format(
1572 vrf["name"], vrf["id"]
1573 )
1574
1575 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
1576 rnode.run(cmd)
1577
1578 # Kernel cmd - Bring down VRF
1579 cmd = "ip link set dev {} down".format(name)
1580 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
1581 rnode.run(cmd)
1582
1583 else:
1584 if name and table_id:
1585 # Kernel cmd- Add VRF and table
1586 cmd = "ip link add {} type vrf table {}".format(
1587 name, table_id
1588 )
1589 logger.info(
1590 "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
1591 )
1592 rnode.run(cmd)
1593
1594 # Kernel cmd - Bring up VRF
1595 cmd = "ip link set dev {} up".format(name)
1596 logger.info(
1597 "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
1598 )
1599 rnode.run(cmd)
1600
1601 for vrf in c_data["vrfs"]:
1602 vni = vrf.setdefault("vni", None)
1603 del_vni = vrf.setdefault("no_vni", None)
1604
1605 if "links" in c_data:
1606 for destRouterLink, data in sorted(c_data["links"].items()):
1607 # Loopback interfaces
1608 if "type" in data and data["type"] == "loopback":
1609 interface_name = destRouterLink
1610 else:
1611 interface_name = data["interface"]
1612
1613 if "vrf" in data:
1614 vrf_list = data["vrf"]
1615
1616 if type(vrf_list) is not list:
1617 vrf_list = [vrf_list]
1618
1619 for _vrf in vrf_list:
1620 cmd = "ip link set {} master {}".format(
1621 interface_name, _vrf
1622 )
1623
1624 logger.info(
1625 "[DUT: %s]: Running" " kernel cmd [%s]",
1626 c_router,
1627 cmd,
1628 )
1629 rnode.run(cmd)
1630
1631 if vni:
1632 config_data.append("vrf {}".format(vrf["name"]))
1633 cmd = "vni {}".format(vni)
1634 config_data.append(cmd)
1635
1636 if del_vni:
1637 config_data.append("vrf {}".format(vrf["name"]))
1638 cmd = "no vni {}".format(del_vni)
1639 config_data.append(cmd)
1640
1641 if config_data:
1642 config_data_dict[c_router] = config_data
1643
1644 result = create_common_configurations(
1645 tgen, config_data_dict, "vrf", build=build
1646 )
1647
1648 except InvalidCLIError:
1649 # Traceback
1650 errormsg = traceback.format_exc()
1651 logger.error(errormsg)
1652 return errormsg
1653
1654 return result
1655
1656
1657 def create_interface_in_kernel(
1658 tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
1659 ):
1660 """
1661 Cretae interfaces in kernel for ipv4/ipv6
1662 Config is done in Linux Kernel:
1663
1664 Parameters
1665 ----------
1666 * `tgen` : Topogen object
1667 * `dut` : Device for which interfaces to be added
1668 * `name` : interface name
1669 * `ip_addr` : ip address for interface
1670 * `vrf` : VRF name, to which interface will be associated
1671 * `netmask` : netmask value, default is None
1672 * `create`: Create interface in kernel, if created then no need
1673 to create
1674 """
1675
1676 rnode = tgen.gears[dut]
1677
1678 if create:
1679 cmd = "ip link show {0} >/dev/null || ip link add {0} type dummy".format(name)
1680 rnode.run(cmd)
1681
1682 if not netmask:
1683 ifaddr = ipaddress.ip_interface(frr_unicode(ip_addr))
1684 else:
1685 ifaddr = ipaddress.ip_interface(
1686 "{}/{}".format(frr_unicode(ip_addr), frr_unicode(netmask))
1687 )
1688 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1689 ifaddr.version, name, ifaddr
1690 )
1691 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1692 rnode.run(cmd)
1693
1694 if vrf:
1695 cmd = "ip link set {} master {}".format(name, vrf)
1696 rnode.run(cmd)
1697
1698
1699 def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False):
1700 """
1701 Cretae interfaces in kernel for ipv4/ipv6
1702 Config is done in Linux Kernel:
1703
1704 Parameters
1705 ----------
1706 * `tgen` : Topogen object
1707 * `dut` : Device for which interfaces to be added
1708 * `intf_name` : interface name
1709 * `ifaceaction` : False to shutdown and True to bringup the
1710 ineterface
1711 """
1712
1713 rnode = tgen.gears[dut]
1714
1715 cmd = "ip link set dev"
1716 if ifaceaction:
1717 action = "up"
1718 cmd = "{} {} {}".format(cmd, intf_name, action)
1719 else:
1720 action = "down"
1721 cmd = "{} {} {}".format(cmd, intf_name, action)
1722
1723 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1724 rnode.run(cmd)
1725
1726
1727 def validate_ip_address(ip_address):
1728 """
1729 Validates the type of ip address
1730 Parameters
1731 ----------
1732 * `ip_address`: IPv4/IPv6 address
1733 Returns
1734 -------
1735 Type of address as string
1736 """
1737
1738 if "/" in ip_address:
1739 ip_address = ip_address.split("/")[0]
1740
1741 v4 = True
1742 v6 = True
1743 try:
1744 socket.inet_aton(ip_address)
1745 except socket.error as error:
1746 logger.debug("Not a valid IPv4 address")
1747 v4 = False
1748 else:
1749 return "ipv4"
1750
1751 try:
1752 socket.inet_pton(socket.AF_INET6, ip_address)
1753 except socket.error as error:
1754 logger.debug("Not a valid IPv6 address")
1755 v6 = False
1756 else:
1757 return "ipv6"
1758
1759 if not v4 and not v6:
1760 raise Exception(
1761 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1762 )
1763
1764
1765 def check_address_types(addr_type=None):
1766 """
1767 Checks environment variable set and compares with the current address type
1768 """
1769
1770 addr_types_env = os.environ.get("ADDRESS_TYPES")
1771 if not addr_types_env:
1772 addr_types_env = "dual"
1773
1774 if addr_types_env == "dual":
1775 addr_types = ["ipv4", "ipv6"]
1776 elif addr_types_env == "ipv4":
1777 addr_types = ["ipv4"]
1778 elif addr_types_env == "ipv6":
1779 addr_types = ["ipv6"]
1780
1781 if addr_type is None:
1782 return addr_types
1783
1784 if addr_type not in addr_types:
1785 logger.debug(
1786 "{} not in supported/configured address types {}".format(
1787 addr_type, addr_types
1788 )
1789 )
1790 return False
1791
1792 return True
1793
1794
1795 def generate_ips(network, no_of_ips):
1796 """
1797 Returns list of IPs.
1798 based on start_ip and no_of_ips
1799
1800 * `network` : from here the ip will start generating,
1801 start_ip will be
1802 * `no_of_ips` : these many IPs will be generated
1803 """
1804 ipaddress_list = []
1805 if type(network) is not list:
1806 network = [network]
1807
1808 for start_ipaddr in network:
1809 if "/" in start_ipaddr:
1810 start_ip = start_ipaddr.split("/")[0]
1811 mask = int(start_ipaddr.split("/")[1])
1812 else:
1813 logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr))
1814 assert 0
1815
1816 addr_type = validate_ip_address(start_ip)
1817 if addr_type == "ipv4":
1818 if start_ip == "0.0.0.0" and mask == 0 and no_of_ips == 1:
1819 ipaddress_list.append("{}/{}".format(start_ip, mask))
1820 return ipaddress_list
1821 start_ip = ipaddress.IPv4Address(frr_unicode(start_ip))
1822 step = 2 ** (32 - mask)
1823 elif addr_type == "ipv6":
1824 if start_ip == "0::0" and mask == 0 and no_of_ips == 1:
1825 ipaddress_list.append("{}/{}".format(start_ip, mask))
1826 return ipaddress_list
1827 start_ip = ipaddress.IPv6Address(frr_unicode(start_ip))
1828 step = 2 ** (128 - mask)
1829 else:
1830 return []
1831
1832 next_ip = start_ip
1833 count = 0
1834 while count < no_of_ips:
1835 ipaddress_list.append("{}/{}".format(next_ip, mask))
1836 if addr_type == "ipv6":
1837 next_ip = ipaddress.IPv6Address(int(next_ip) + step)
1838 else:
1839 next_ip += step
1840 count += 1
1841
1842 return ipaddress_list
1843
1844
1845 def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
1846 """
1847 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1848 it will return highest IP from loopback IPs otherwise from physical
1849 interface IPs.
1850 * `topo` : json file data
1851 * `router` : router for which highest interface should be calculated
1852 """
1853
1854 link_data = topo["routers"][router]["links"]
1855 lo_list = []
1856 interfaces_list = []
1857 lo_exists = False
1858 for destRouterLink, data in sorted(link_data.items()):
1859 if loopback:
1860 if "type" in data and data["type"] == "loopback":
1861 lo_exists = True
1862 ip_address = topo["routers"][router]["links"][destRouterLink][
1863 "ipv4"
1864 ].split("/")[0]
1865 lo_list.append(ip_address)
1866 if interface:
1867 ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split(
1868 "/"
1869 )[0]
1870 interfaces_list.append(ip_address)
1871
1872 if lo_exists:
1873 return sorted(lo_list)[-1]
1874
1875 return sorted(interfaces_list)[-1]
1876
1877
1878 def write_test_header(tc_name):
1879 """Display message at beginning of test case"""
1880 count = 20
1881 logger.info("*" * (len(tc_name) + count))
1882 step("START -> Testcase : %s" % tc_name, reset=True)
1883 logger.info("*" * (len(tc_name) + count))
1884
1885
1886 def write_test_footer(tc_name):
1887 """Display message at end of test case"""
1888 count = 21
1889 logger.info("=" * (len(tc_name) + count))
1890 logger.info("Testcase : %s -> PASSED", tc_name)
1891 logger.info("=" * (len(tc_name) + count))
1892
1893
1894 def interface_status(tgen, topo, input_dict):
1895 """
1896 Delete ip route maps from device
1897 * `tgen` : Topogen object
1898 * `topo` : json file data
1899 * `input_dict` : for which router, route map has to be deleted
1900 Usage
1901 -----
1902 input_dict = {
1903 "r3": {
1904 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1905 "status": "down"
1906 }
1907 }
1908 Returns
1909 -------
1910 errormsg(str) or True
1911 """
1912 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1913
1914 try:
1915 rlist = []
1916
1917 for router in input_dict.keys():
1918
1919 interface_list = input_dict[router]["interface_list"]
1920 status = input_dict[router].setdefault("status", "up")
1921 for intf in interface_list:
1922 rnode = tgen.gears[router]
1923 interface_set_status(rnode, intf, status)
1924
1925 rlist.append(router)
1926
1927 # Load config to routers
1928 load_config_to_routers(tgen, rlist)
1929
1930 except Exception as e:
1931 errormsg = traceback.format_exc()
1932 logger.error(errormsg)
1933 return errormsg
1934
1935 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1936 return True
1937
1938
1939 def retry(retry_timeout, initial_wait=0, expected=True, diag_pct=0.75):
1940 """
1941 Fixture: Retries function while it's return value is an errormsg (str), False, or it raises an exception.
1942
1943 * `retry_timeout`: Retry for at least this many seconds; after waiting initial_wait seconds
1944 * `initial_wait`: Sleeps for this many seconds before first executing function
1945 * `expected`: if False then the return logic is inverted, except for exceptions,
1946 (i.e., a False or errmsg (str) function return ends the retry loop,
1947 and returns that False or str value)
1948 * `diag_pct`: Percentage of `retry_timeout` to keep testing after negative result would have
1949 been returned in order to see if a positive result comes after. This is an
1950 important diagnostic tool, and normally should not be disabled. Calls to wrapped
1951 functions though, can override the `diag_pct` value to make it larger in case more
1952 diagnostic retrying is appropriate.
1953 """
1954
1955 def _retry(func):
1956 @wraps(func)
1957 def func_retry(*args, **kwargs):
1958 # We will continue to retry diag_pct of the timeout value to see if test would have passed with a
1959 # longer retry timeout value.
1960 saved_failure = None
1961
1962 retry_sleep = 2
1963
1964 # Allow the wrapped function's args to override the fixtures
1965 _retry_timeout = kwargs.pop("retry_timeout", retry_timeout)
1966 _expected = kwargs.pop("expected", expected)
1967 _initial_wait = kwargs.pop("initial_wait", initial_wait)
1968 _diag_pct = kwargs.pop("diag_pct", diag_pct)
1969
1970 start_time = datetime.now()
1971 retry_until = datetime.now() + timedelta(
1972 seconds=_retry_timeout + _initial_wait
1973 )
1974
1975 if initial_wait > 0:
1976 logger.info("Waiting for [%s]s as initial delay", initial_wait)
1977 sleep(initial_wait)
1978
1979 invert_logic = not _expected
1980 while True:
1981 seconds_left = (retry_until - datetime.now()).total_seconds()
1982 try:
1983 ret = func(*args, **kwargs)
1984 logger.debug("Function returned %s", ret)
1985
1986 negative_result = ret is False or is_string(ret)
1987 if negative_result == invert_logic:
1988 # Simple case, successful result in time
1989 if not saved_failure:
1990 return ret
1991
1992 # Positive result, but happened after timeout failure, very important to
1993 # note for fixing tests.
1994 logger.warning(
1995 "RETRY DIAGNOSTIC: SUCCEED after FAILED with requested timeout of %.1fs; however, succeeded in %.1fs, investigate timeout timing",
1996 _retry_timeout,
1997 (datetime.now() - start_time).total_seconds(),
1998 )
1999 if isinstance(saved_failure, Exception):
2000 raise saved_failure # pylint: disable=E0702
2001 return saved_failure
2002
2003 except Exception as error:
2004 logger.info("Function raised exception: %s", str(error))
2005 ret = error
2006
2007 if seconds_left < 0 and saved_failure:
2008 logger.info(
2009 "RETRY DIAGNOSTIC: Retry timeout reached, still failing"
2010 )
2011 if isinstance(saved_failure, Exception):
2012 raise saved_failure # pylint: disable=E0702
2013 return saved_failure
2014
2015 if seconds_left < 0:
2016 logger.info("Retry timeout of %ds reached", _retry_timeout)
2017
2018 saved_failure = ret
2019 retry_extra_delta = timedelta(
2020 seconds=seconds_left + _retry_timeout * _diag_pct
2021 )
2022 retry_until = datetime.now() + retry_extra_delta
2023 seconds_left = retry_extra_delta.total_seconds()
2024
2025 # Generate bundle after setting remaining diagnostic retry time
2026 generate_support_bundle()
2027
2028 # If user has disabled diagnostic retries return now
2029 if not _diag_pct:
2030 if isinstance(saved_failure, Exception):
2031 raise saved_failure
2032 return saved_failure
2033
2034 if saved_failure:
2035 logger.info(
2036 "RETRY DIAG: [failure] Sleeping %ds until next retry with %.1f retry time left - too see if timeout was too short",
2037 retry_sleep,
2038 seconds_left,
2039 )
2040 else:
2041 logger.info(
2042 "Sleeping %ds until next retry with %.1f retry time left",
2043 retry_sleep,
2044 seconds_left,
2045 )
2046 sleep(retry_sleep)
2047
2048 func_retry._original = func
2049 return func_retry
2050
2051 return _retry
2052
2053
2054 class Stepper:
2055 """
2056 Prints step number for the test case step being executed
2057 """
2058
2059 count = 1
2060
2061 def __call__(self, msg, reset):
2062 if reset:
2063 Stepper.count = 1
2064 logger.info(msg)
2065 else:
2066 logger.info("STEP %s: '%s'", Stepper.count, msg)
2067 Stepper.count += 1
2068
2069
2070 def step(msg, reset=False):
2071 """
2072 Call Stepper to print test steps. Need to reset at the beginning of test.
2073 * ` msg` : Step message body.
2074 * `reset` : Reset step count to 1 when set to True.
2075 """
2076 _step = Stepper()
2077 _step(msg, reset)
2078
2079
2080 def do_countdown(secs):
2081 """
2082 Countdown timer display
2083 """
2084 for i in range(secs, 0, -1):
2085 sys.stdout.write("{} ".format(str(i)))
2086 sys.stdout.flush()
2087 sleep(1)
2088 return
2089
2090
2091 #############################################
2092 # These APIs, will used by testcase
2093 #############################################
2094 def create_interfaces_cfg(tgen, topo, build=False):
2095 """
2096 Create interface configuration for created topology. Basic Interface
2097 configuration is provided in input json file.
2098
2099 Parameters
2100 ----------
2101 * `tgen` : Topogen object
2102 * `topo` : json file data
2103 * `build` : Only for initial setup phase this is set as True.
2104
2105 Returns
2106 -------
2107 True or False
2108 """
2109
2110 def _create_interfaces_ospf_cfg(ospf, c_data, data, ospf_keywords):
2111 interface_data = []
2112 ip_ospf = "ipv6 ospf6" if ospf == "ospf6" else "ip ospf"
2113 for keyword in ospf_keywords:
2114 if keyword in data[ospf]:
2115 intf_ospf_value = c_data["links"][destRouterLink][ospf][keyword]
2116 if "delete" in data and data["delete"]:
2117 interface_data.append(
2118 "no {} {}".format(ip_ospf, keyword.replace("_", "-"))
2119 )
2120 else:
2121 interface_data.append(
2122 "{} {} {}".format(
2123 ip_ospf, keyword.replace("_", "-"), intf_ospf_value
2124 )
2125 )
2126 return interface_data
2127
2128 result = False
2129 topo = deepcopy(topo)
2130
2131 try:
2132 interface_data_dict = {}
2133
2134 for c_router, c_data in topo.items():
2135 interface_data = []
2136 for destRouterLink, data in sorted(c_data["links"].items()):
2137 # Loopback interfaces
2138 if "type" in data and data["type"] == "loopback":
2139 interface_name = destRouterLink
2140 else:
2141 interface_name = data["interface"]
2142
2143 interface_data.append("interface {}".format(str(interface_name)))
2144
2145 if "ipv4" in data:
2146 intf_addr = c_data["links"][destRouterLink]["ipv4"]
2147
2148 if "delete" in data and data["delete"]:
2149 interface_data.append("no ip address {}".format(intf_addr))
2150 else:
2151 interface_data.append("ip address {}".format(intf_addr))
2152 if "ipv6" in data:
2153 intf_addr = c_data["links"][destRouterLink]["ipv6"]
2154
2155 if "delete" in data and data["delete"]:
2156 interface_data.append("no ipv6 address {}".format(intf_addr))
2157 else:
2158 interface_data.append("ipv6 address {}".format(intf_addr))
2159
2160 # Wait for vrf interfaces to get link local address once they are up
2161 if (
2162 not destRouterLink == "lo"
2163 and "vrf" in topo[c_router]["links"][destRouterLink]
2164 ):
2165 vrf = topo[c_router]["links"][destRouterLink]["vrf"]
2166 intf = topo[c_router]["links"][destRouterLink]["interface"]
2167 ll = get_frr_ipv6_linklocal(tgen, c_router, intf=intf, vrf=vrf)
2168
2169 if "ipv6-link-local" in data:
2170 intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
2171
2172 if "delete" in data and data["delete"]:
2173 interface_data.append("no ipv6 address {}".format(intf_addr))
2174 else:
2175 interface_data.append("ipv6 address {}\n".format(intf_addr))
2176
2177 ospf_keywords = [
2178 "hello_interval",
2179 "dead_interval",
2180 "network",
2181 "priority",
2182 "cost",
2183 "mtu_ignore",
2184 ]
2185 if "ospf" in data:
2186 interface_data += _create_interfaces_ospf_cfg(
2187 "ospf", c_data, data, ospf_keywords + ["area"]
2188 )
2189 if "ospf6" in data:
2190 interface_data += _create_interfaces_ospf_cfg(
2191 "ospf6", c_data, data, ospf_keywords + ["area"]
2192 )
2193 if interface_data:
2194 interface_data_dict[c_router] = interface_data
2195
2196 result = create_common_configurations(
2197 tgen, interface_data_dict, "interface_config", build=build
2198 )
2199
2200 except InvalidCLIError:
2201 # Traceback
2202 errormsg = traceback.format_exc()
2203 logger.error(errormsg)
2204 return errormsg
2205
2206 return result
2207
2208
2209 def create_static_routes(tgen, input_dict, build=False):
2210 """
2211 Create static routes for given router as defined in input_dict
2212
2213 Parameters
2214 ----------
2215 * `tgen` : Topogen object
2216 * `input_dict` : Input dict data, required when configuring from testcase
2217 * `build` : Only for initial setup phase this is set as True.
2218
2219 Usage
2220 -----
2221 input_dict should be in the format below:
2222 # static_routes: list of all routes
2223 # network: network address
2224 # no_of_ip: number of next-hop address that will be configured
2225 # admin_distance: admin distance for route/routes.
2226 # next_hop: starting next-hop address
2227 # tag: tag id for static routes
2228 # vrf: VRF name in which static routes needs to be created
2229 # delete: True if config to be removed. Default False.
2230
2231 Example:
2232 "routers": {
2233 "r1": {
2234 "static_routes": [
2235 {
2236 "network": "100.0.20.1/32",
2237 "no_of_ip": 9,
2238 "admin_distance": 100,
2239 "next_hop": "10.0.0.1",
2240 "tag": 4001,
2241 "vrf": "RED_A"
2242 "delete": true
2243 }
2244 ]
2245 }
2246 }
2247
2248 Returns
2249 -------
2250 errormsg(str) or True
2251 """
2252 result = False
2253 logger.debug("Entering lib API: create_static_routes()")
2254 input_dict = deepcopy(input_dict)
2255
2256 try:
2257 static_routes_list_dict = {}
2258
2259 for router in input_dict.keys():
2260 if "static_routes" not in input_dict[router]:
2261 errormsg = "static_routes not present in input_dict"
2262 logger.info(errormsg)
2263 continue
2264
2265 static_routes_list = []
2266
2267 static_routes = input_dict[router]["static_routes"]
2268 for static_route in static_routes:
2269 del_action = static_route.setdefault("delete", False)
2270 no_of_ip = static_route.setdefault("no_of_ip", 1)
2271 network = static_route.setdefault("network", [])
2272 if type(network) is not list:
2273 network = [network]
2274
2275 admin_distance = static_route.setdefault("admin_distance", None)
2276 tag = static_route.setdefault("tag", None)
2277 vrf = static_route.setdefault("vrf", None)
2278 interface = static_route.setdefault("interface", None)
2279 next_hop = static_route.setdefault("next_hop", None)
2280 nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
2281
2282 ip_list = generate_ips(network, no_of_ip)
2283 for ip in ip_list:
2284 addr_type = validate_ip_address(ip)
2285
2286 if addr_type == "ipv4":
2287 cmd = "ip route {}".format(ip)
2288 else:
2289 cmd = "ipv6 route {}".format(ip)
2290
2291 if interface:
2292 cmd = "{} {}".format(cmd, interface)
2293
2294 if next_hop:
2295 cmd = "{} {}".format(cmd, next_hop)
2296
2297 if nexthop_vrf:
2298 cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
2299
2300 if vrf:
2301 cmd = "{} vrf {}".format(cmd, vrf)
2302
2303 if tag:
2304 cmd = "{} tag {}".format(cmd, str(tag))
2305
2306 if admin_distance:
2307 cmd = "{} {}".format(cmd, admin_distance)
2308
2309 if del_action:
2310 cmd = "no {}".format(cmd)
2311
2312 static_routes_list.append(cmd)
2313
2314 if static_routes_list:
2315 static_routes_list_dict[router] = static_routes_list
2316
2317 result = create_common_configurations(
2318 tgen, static_routes_list_dict, "static_route", build=build
2319 )
2320
2321 except InvalidCLIError:
2322 # Traceback
2323 errormsg = traceback.format_exc()
2324 logger.error(errormsg)
2325 return errormsg
2326
2327 logger.debug("Exiting lib API: create_static_routes()")
2328 return result
2329
2330
2331 def create_prefix_lists(tgen, input_dict, build=False):
2332 """
2333 Create ip prefix lists as per the config provided in input
2334 JSON or input_dict
2335 Parameters
2336 ----------
2337 * `tgen` : Topogen object
2338 * `input_dict` : Input dict data, required when configuring from testcase
2339 * `build` : Only for initial setup phase this is set as True.
2340 Usage
2341 -----
2342 # pf_lists_1: name of prefix-list, user defined
2343 # seqid: prefix-list seqid, auto-generated if not given by user
2344 # network: criteria for applying prefix-list
2345 # action: permit/deny
2346 # le: less than or equal number of bits
2347 # ge: greater than or equal number of bits
2348 Example
2349 -------
2350 input_dict = {
2351 "r1": {
2352 "prefix_lists":{
2353 "ipv4": {
2354 "pf_list_1": [
2355 {
2356 "seqid": 10,
2357 "network": "any",
2358 "action": "permit",
2359 "le": "32",
2360 "ge": "30",
2361 "delete": True
2362 }
2363 ]
2364 }
2365 }
2366 }
2367 }
2368 Returns
2369 -------
2370 errormsg or True
2371 """
2372
2373 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2374 result = False
2375 try:
2376 config_data_dict = {}
2377
2378 for router in input_dict.keys():
2379 if "prefix_lists" not in input_dict[router]:
2380 errormsg = "prefix_lists not present in input_dict"
2381 logger.debug(errormsg)
2382 continue
2383
2384 config_data = []
2385 prefix_lists = input_dict[router]["prefix_lists"]
2386 for addr_type, prefix_data in prefix_lists.items():
2387 if not check_address_types(addr_type):
2388 continue
2389
2390 for prefix_name, prefix_list in prefix_data.items():
2391 for prefix_dict in prefix_list:
2392 if "action" not in prefix_dict or "network" not in prefix_dict:
2393 errormsg = "'action' or network' missing in" " input_dict"
2394 return errormsg
2395
2396 network_addr = prefix_dict["network"]
2397 action = prefix_dict["action"]
2398 le = prefix_dict.setdefault("le", None)
2399 ge = prefix_dict.setdefault("ge", None)
2400 seqid = prefix_dict.setdefault("seqid", None)
2401 del_action = prefix_dict.setdefault("delete", False)
2402 if seqid is None:
2403 seqid = get_seq_id("prefix_lists", router, prefix_name)
2404 else:
2405 set_seq_id("prefix_lists", router, seqid, prefix_name)
2406
2407 if addr_type == "ipv4":
2408 protocol = "ip"
2409 else:
2410 protocol = "ipv6"
2411
2412 cmd = "{} prefix-list {} seq {} {} {}".format(
2413 protocol, prefix_name, seqid, action, network_addr
2414 )
2415 if le:
2416 cmd = "{} le {}".format(cmd, le)
2417 if ge:
2418 cmd = "{} ge {}".format(cmd, ge)
2419
2420 if del_action:
2421 cmd = "no {}".format(cmd)
2422
2423 config_data.append(cmd)
2424 if config_data:
2425 config_data_dict[router] = config_data
2426
2427 result = create_common_configurations(
2428 tgen, config_data_dict, "prefix_list", build=build
2429 )
2430
2431 except InvalidCLIError:
2432 # Traceback
2433 errormsg = traceback.format_exc()
2434 logger.error(errormsg)
2435 return errormsg
2436
2437 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2438 return result
2439
2440
2441 def create_route_maps(tgen, input_dict, build=False):
2442 """
2443 Create route-map on the devices as per the arguments passed
2444 Parameters
2445 ----------
2446 * `tgen` : Topogen object
2447 * `input_dict` : Input dict data, required when configuring from testcase
2448 * `build` : Only for initial setup phase this is set as True.
2449 Usage
2450 -----
2451 # route_maps: key, value pair for route-map name and its attribute
2452 # rmap_match_prefix_list_1: user given name for route-map
2453 # action: PERMIT/DENY
2454 # match: key,value pair for match criteria. prefix_list, community-list,
2455 large-community-list or tag. Only one option at a time.
2456 # prefix_list: name of prefix list
2457 # large-community-list: name of large community list
2458 # community-ist: name of community list
2459 # tag: tag id for static routes
2460 # set: key, value pair for modifying route attributes
2461 # localpref: preference value for the network
2462 # med: metric value advertised for AS
2463 # aspath: set AS path value
2464 # weight: weight for the route
2465 # community: standard community value to be attached
2466 # large_community: large community value to be attached
2467 # community_additive: if set to "additive", adds community/large-community
2468 value to the existing values of the network prefix
2469 Example:
2470 --------
2471 input_dict = {
2472 "r1": {
2473 "route_maps": {
2474 "rmap_match_prefix_list_1": [
2475 {
2476 "action": "PERMIT",
2477 "match": {
2478 "ipv4": {
2479 "prefix_list": "pf_list_1"
2480 }
2481 "ipv6": {
2482 "prefix_list": "pf_list_1"
2483 }
2484 "large-community-list": {
2485 "id": "community_1",
2486 "exact_match": True
2487 }
2488 "community_list": {
2489 "id": "community_2",
2490 "exact_match": True
2491 }
2492 "tag": "tag_id"
2493 },
2494 "set": {
2495 "locPrf": 150,
2496 "metric": 30,
2497 "path": {
2498 "num": 20000,
2499 "action": "prepend",
2500 },
2501 "weight": 500,
2502 "community": {
2503 "num": "1:2 2:3",
2504 "action": additive
2505 }
2506 "large_community": {
2507 "num": "1:2:3 4:5;6",
2508 "action": additive
2509 },
2510 }
2511 }
2512 ]
2513 }
2514 }
2515 }
2516 Returns
2517 -------
2518 errormsg(str) or True
2519 """
2520
2521 result = False
2522 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2523 input_dict = deepcopy(input_dict)
2524 try:
2525 rmap_data_dict = {}
2526
2527 for router in input_dict.keys():
2528 if "route_maps" not in input_dict[router]:
2529 logger.debug("route_maps not present in input_dict")
2530 continue
2531 rmap_data = []
2532 for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
2533
2534 for rmap_dict in rmap_value:
2535 del_action = rmap_dict.setdefault("delete", False)
2536
2537 if del_action:
2538 rmap_data.append("no route-map {}".format(rmap_name))
2539 continue
2540
2541 if "action" not in rmap_dict:
2542 errormsg = "action not present in input_dict"
2543 logger.error(errormsg)
2544 return False
2545
2546 rmap_action = rmap_dict.setdefault("action", "deny")
2547
2548 seq_id = rmap_dict.setdefault("seq_id", None)
2549 if seq_id is None:
2550 seq_id = get_seq_id("route_maps", router, rmap_name)
2551 else:
2552 set_seq_id("route_maps", router, seq_id, rmap_name)
2553
2554 rmap_data.append(
2555 "route-map {} {} {}".format(rmap_name, rmap_action, seq_id)
2556 )
2557
2558 if "continue" in rmap_dict:
2559 continue_to = rmap_dict["continue"]
2560 if continue_to:
2561 rmap_data.append("on-match goto {}".format(continue_to))
2562 else:
2563 logger.error(
2564 "In continue, 'route-map entry "
2565 "sequence number' is not provided"
2566 )
2567 return False
2568
2569 if "goto" in rmap_dict:
2570 go_to = rmap_dict["goto"]
2571 if go_to:
2572 rmap_data.append("on-match goto {}".format(go_to))
2573 else:
2574 logger.error(
2575 "In goto, 'Goto Clause number' is not" " provided"
2576 )
2577 return False
2578
2579 if "call" in rmap_dict:
2580 call_rmap = rmap_dict["call"]
2581 if call_rmap:
2582 rmap_data.append("call {}".format(call_rmap))
2583 else:
2584 logger.error(
2585 "In call, 'destination Route-Map' is" " not provided"
2586 )
2587 return False
2588
2589 # Verifying if SET criteria is defined
2590 if "set" in rmap_dict:
2591 set_data = rmap_dict["set"]
2592 ipv4_data = set_data.setdefault("ipv4", {})
2593 ipv6_data = set_data.setdefault("ipv6", {})
2594 local_preference = set_data.setdefault("locPrf", None)
2595 metric = set_data.setdefault("metric", None)
2596 metric_type = set_data.setdefault("metric-type", None)
2597 as_path = set_data.setdefault("path", {})
2598 weight = set_data.setdefault("weight", None)
2599 community = set_data.setdefault("community", {})
2600 large_community = set_data.setdefault("large_community", {})
2601 large_comm_list = set_data.setdefault("large_comm_list", {})
2602 set_action = set_data.setdefault("set_action", None)
2603 nexthop = set_data.setdefault("nexthop", None)
2604 origin = set_data.setdefault("origin", None)
2605 ext_comm_list = set_data.setdefault("extcommunity", {})
2606 metrictype = set_data.setdefault("metric-type", {})
2607
2608 # Local Preference
2609 if local_preference:
2610 rmap_data.append(
2611 "set local-preference {}".format(local_preference)
2612 )
2613
2614 # Metric-Type
2615 if metrictype:
2616 rmap_data.append("set metric-type {}\n".format(metrictype))
2617
2618 # Metric
2619 if metric:
2620 del_comm = set_data.setdefault("delete", None)
2621 if del_comm:
2622 rmap_data.append("no set metric {}".format(metric))
2623 else:
2624 rmap_data.append("set metric {}".format(metric))
2625
2626 # Origin
2627 if origin:
2628 rmap_data.append("set origin {} \n".format(origin))
2629
2630 # AS Path Prepend
2631 if as_path:
2632 as_num = as_path.setdefault("as_num", None)
2633 as_action = as_path.setdefault("as_action", None)
2634 if as_action and as_num:
2635 rmap_data.append(
2636 "set as-path {} {}".format(as_action, as_num)
2637 )
2638
2639 # Community
2640 if community:
2641 num = community.setdefault("num", None)
2642 comm_action = community.setdefault("action", None)
2643 if num:
2644 cmd = "set community {}".format(num)
2645 if comm_action:
2646 cmd = "{} {}".format(cmd, comm_action)
2647 rmap_data.append(cmd)
2648 else:
2649 logger.error("In community, AS Num not" " provided")
2650 return False
2651
2652 if large_community:
2653 num = large_community.setdefault("num", None)
2654 comm_action = large_community.setdefault("action", None)
2655 if num:
2656 cmd = "set large-community {}".format(num)
2657 if comm_action:
2658 cmd = "{} {}".format(cmd, comm_action)
2659
2660 rmap_data.append(cmd)
2661 else:
2662 logger.error(
2663 "In large_community, AS Num not" " provided"
2664 )
2665 return False
2666 if large_comm_list:
2667 id = large_comm_list.setdefault("id", None)
2668 del_comm = large_comm_list.setdefault("delete", None)
2669 if id:
2670 cmd = "set large-comm-list {}".format(id)
2671 if del_comm:
2672 cmd = "{} delete".format(cmd)
2673
2674 rmap_data.append(cmd)
2675 else:
2676 logger.error("In large_comm_list 'id' not" " provided")
2677 return False
2678
2679 if ext_comm_list:
2680 rt = ext_comm_list.setdefault("rt", None)
2681 del_comm = ext_comm_list.setdefault("delete", None)
2682 if rt:
2683 cmd = "set extcommunity rt {}".format(rt)
2684 if del_comm:
2685 cmd = "{} delete".format(cmd)
2686
2687 rmap_data.append(cmd)
2688 else:
2689 logger.debug("In ext_comm_list 'rt' not" " provided")
2690 return False
2691
2692 # Weight
2693 if weight:
2694 rmap_data.append("set weight {}".format(weight))
2695 if ipv6_data:
2696 nexthop = ipv6_data.setdefault("nexthop", None)
2697 if nexthop:
2698 rmap_data.append("set ipv6 next-hop {}".format(nexthop))
2699
2700 # Adding MATCH and SET sequence to RMAP if defined
2701 if "match" in rmap_dict:
2702 match_data = rmap_dict["match"]
2703 ipv4_data = match_data.setdefault("ipv4", {})
2704 ipv6_data = match_data.setdefault("ipv6", {})
2705 community = match_data.setdefault("community_list", {})
2706 large_community = match_data.setdefault("large_community", {})
2707 large_community_list = match_data.setdefault(
2708 "large_community_list", {}
2709 )
2710
2711 metric = match_data.setdefault("metric", None)
2712 source_vrf = match_data.setdefault("source-vrf", None)
2713
2714 if ipv4_data:
2715 # fetch prefix list data from rmap
2716 prefix_name = ipv4_data.setdefault("prefix_lists", None)
2717 if prefix_name:
2718 rmap_data.append(
2719 "match ip address"
2720 " prefix-list {}".format(prefix_name)
2721 )
2722
2723 # fetch tag data from rmap
2724 tag = ipv4_data.setdefault("tag", None)
2725 if tag:
2726 rmap_data.append("match tag {}".format(tag))
2727
2728 # fetch large community data from rmap
2729 large_community_list = ipv4_data.setdefault(
2730 "large_community_list", {}
2731 )
2732 large_community = match_data.setdefault(
2733 "large_community", {}
2734 )
2735
2736 if ipv6_data:
2737 prefix_name = ipv6_data.setdefault("prefix_lists", None)
2738 if prefix_name:
2739 rmap_data.append(
2740 "match ipv6 address"
2741 " prefix-list {}".format(prefix_name)
2742 )
2743
2744 # fetch tag data from rmap
2745 tag = ipv6_data.setdefault("tag", None)
2746 if tag:
2747 rmap_data.append("match tag {}".format(tag))
2748
2749 # fetch large community data from rmap
2750 large_community_list = ipv6_data.setdefault(
2751 "large_community_list", {}
2752 )
2753 large_community = match_data.setdefault(
2754 "large_community", {}
2755 )
2756
2757 if community:
2758 if "id" not in community:
2759 logger.error(
2760 "'id' is mandatory for "
2761 "community-list in match"
2762 " criteria"
2763 )
2764 return False
2765 cmd = "match community {}".format(community["id"])
2766 exact_match = community.setdefault("exact_match", False)
2767 if exact_match:
2768 cmd = "{} exact-match".format(cmd)
2769
2770 rmap_data.append(cmd)
2771 if large_community:
2772 if "id" not in large_community:
2773 logger.error(
2774 "'id' is mandatory for "
2775 "large-community-list in match "
2776 "criteria"
2777 )
2778 return False
2779 cmd = "match large-community {}".format(
2780 large_community["id"]
2781 )
2782 exact_match = large_community.setdefault(
2783 "exact_match", False
2784 )
2785 if exact_match:
2786 cmd = "{} exact-match".format(cmd)
2787 rmap_data.append(cmd)
2788 if large_community_list:
2789 if "id" not in large_community_list:
2790 logger.error(
2791 "'id' is mandatory for "
2792 "large-community-list in match "
2793 "criteria"
2794 )
2795 return False
2796 cmd = "match large-community {}".format(
2797 large_community_list["id"]
2798 )
2799 exact_match = large_community_list.setdefault(
2800 "exact_match", False
2801 )
2802 if exact_match:
2803 cmd = "{} exact-match".format(cmd)
2804 rmap_data.append(cmd)
2805
2806 if source_vrf:
2807 cmd = "match source-vrf {}".format(source_vrf)
2808 rmap_data.append(cmd)
2809
2810 if metric:
2811 cmd = "match metric {}".format(metric)
2812 rmap_data.append(cmd)
2813
2814 if rmap_data:
2815 rmap_data_dict[router] = rmap_data
2816
2817 result = create_common_configurations(
2818 tgen, rmap_data_dict, "route_maps", build=build
2819 )
2820
2821 except InvalidCLIError:
2822 # Traceback
2823 errormsg = traceback.format_exc()
2824 logger.error(errormsg)
2825 return errormsg
2826
2827 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2828 return result
2829
2830
2831 def delete_route_maps(tgen, input_dict):
2832 """
2833 Delete ip route maps from device
2834 * `tgen` : Topogen object
2835 * `input_dict` : for which router,
2836 route map has to be deleted
2837 Usage
2838 -----
2839 # Delete route-map rmap_1 and rmap_2 from router r1
2840 input_dict = {
2841 "r1": {
2842 "route_maps": ["rmap_1", "rmap__2"]
2843 }
2844 }
2845 result = delete_route_maps("ipv4", input_dict)
2846 Returns
2847 -------
2848 errormsg(str) or True
2849 """
2850 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2851
2852 for router in input_dict.keys():
2853 route_maps = input_dict[router]["route_maps"][:]
2854 rmap_data = input_dict[router]
2855 rmap_data["route_maps"] = {}
2856 for route_map_name in route_maps:
2857 rmap_data["route_maps"].update({route_map_name: [{"delete": True}]})
2858
2859 return create_route_maps(tgen, input_dict)
2860
2861
2862 def create_bgp_community_lists(tgen, input_dict, build=False):
2863 """
2864 Create bgp community-list or large-community-list on the devices as per
2865 the arguments passed. Takes list of communities in input.
2866 Parameters
2867 ----------
2868 * `tgen` : Topogen object
2869 * `input_dict` : Input dict data, required when configuring from testcase
2870 * `build` : Only for initial setup phase this is set as True.
2871 Usage
2872 -----
2873 input_dict_1 = {
2874 "r3": {
2875 "bgp_community_lists": [
2876 {
2877 "community_type": "standard",
2878 "action": "permit",
2879 "name": "rmap_lcomm_{}".format(addr_type),
2880 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2881 "large": True
2882 }
2883 ]
2884 }
2885 }
2886 }
2887 result = create_bgp_community_lists(tgen, input_dict_1)
2888 """
2889
2890 result = False
2891 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2892 input_dict = deepcopy(input_dict)
2893 try:
2894 config_data_dict = {}
2895
2896 for router in input_dict.keys():
2897 if "bgp_community_lists" not in input_dict[router]:
2898 errormsg = "bgp_community_lists not present in input_dict"
2899 logger.debug(errormsg)
2900 continue
2901
2902 config_data = []
2903
2904 community_list = input_dict[router]["bgp_community_lists"]
2905 for community_dict in community_list:
2906 del_action = community_dict.setdefault("delete", False)
2907 community_type = community_dict.setdefault("community_type", None)
2908 action = community_dict.setdefault("action", None)
2909 value = community_dict.setdefault("value", "")
2910 large = community_dict.setdefault("large", None)
2911 name = community_dict.setdefault("name", None)
2912 if large:
2913 cmd = "bgp large-community-list"
2914 else:
2915 cmd = "bgp community-list"
2916
2917 if not large and not (community_type and action and value):
2918 errormsg = (
2919 "community_type, action and value are "
2920 "required in bgp_community_list"
2921 )
2922 logger.error(errormsg)
2923 return False
2924
2925 cmd = "{} {} {} {} {}".format(cmd, community_type, name, action, value)
2926
2927 if del_action:
2928 cmd = "no {}".format(cmd)
2929
2930 config_data.append(cmd)
2931
2932 if config_data:
2933 config_data_dict[router] = config_data
2934
2935 result = create_common_configurations(
2936 tgen, config_data_dict, "bgp_community_list", build=build
2937 )
2938
2939 except InvalidCLIError:
2940 # Traceback
2941 errormsg = traceback.format_exc()
2942 logger.error(errormsg)
2943 return errormsg
2944
2945 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2946 return result
2947
2948
2949 def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
2950 """
2951 Shutdown or bringup router's interface "
2952 * `tgen` : Topogen object
2953 * `dut` : Device under test
2954 * `intf_name` : Interface name to be shut/no shut
2955 * `ifaceaction` : Action, to shut/no shut interface,
2956 by default is False
2957 Usage
2958 -----
2959 dut = "r3"
2960 intf = "r3-r1-eth0"
2961 # Shut down interface
2962 shutdown_bringup_interface(tgen, dut, intf, False)
2963 # Bring up interface
2964 shutdown_bringup_interface(tgen, dut, intf, True)
2965 Returns
2966 -------
2967 errormsg(str) or True
2968 """
2969
2970 router_list = tgen.routers()
2971 if ifaceaction:
2972 logger.info("Bringing up interface {} : {}".format(dut, intf_name))
2973 else:
2974 logger.info("Shutting down interface {} : {}".format(dut, intf_name))
2975
2976 interface_set_status(router_list[dut], intf_name, ifaceaction)
2977
2978
2979 def addKernelRoute(
2980 tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
2981 ):
2982 """
2983 Add route to kernel
2984
2985 Parameters:
2986 -----------
2987 * `tgen` : Topogen object
2988 * `router`: router for which kernel routes needs to be added
2989 * `intf`: interface name, for which kernel routes needs to be added
2990 * `bindToAddress`: bind to <host>, an interface or multicast
2991 address
2992
2993 returns:
2994 --------
2995 errormsg or True
2996 """
2997
2998 logger.debug("Entering lib API: addKernelRoute()")
2999
3000 rnode = tgen.gears[router]
3001
3002 if type(group_addr_range) is not list:
3003 group_addr_range = [group_addr_range]
3004
3005 for grp_addr in group_addr_range:
3006
3007 addr_type = validate_ip_address(grp_addr)
3008 if addr_type == "ipv4":
3009 if next_hop is not None:
3010 cmd = "ip route add {} via {}".format(grp_addr, next_hop)
3011 else:
3012 cmd = "ip route add {} dev {}".format(grp_addr, intf)
3013 if del_action:
3014 cmd = "ip route del {}".format(grp_addr)
3015 verify_cmd = "ip route"
3016 elif addr_type == "ipv6":
3017 if intf and src:
3018 cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
3019 else:
3020 cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
3021 verify_cmd = "ip -6 route"
3022 if del_action:
3023 cmd = "ip -6 route del {}".format(grp_addr)
3024
3025 logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
3026 output = rnode.run(cmd)
3027
3028 def check_in_kernel(rnode, verify_cmd, grp_addr, router):
3029 # Verifying if ip route added to kernel
3030 errormsg = None
3031 result = rnode.run(verify_cmd)
3032 logger.debug("{}\n{}".format(verify_cmd, result))
3033 if "/" in grp_addr:
3034 ip, mask = grp_addr.split("/")
3035 if mask == "32" or mask == "128":
3036 grp_addr = ip
3037 else:
3038 mask = "32" if addr_type == "ipv4" else "128"
3039
3040 if not re_search(r"{}".format(grp_addr), result) and mask != "0":
3041 errormsg = (
3042 "[DUT: {}]: Kernal route is not added for group"
3043 " address {} Config output: {}".format(
3044 router, grp_addr, output
3045 )
3046 )
3047
3048 return errormsg
3049
3050 test_func = functools.partial(
3051 check_in_kernel, rnode, verify_cmd, grp_addr, router
3052 )
3053 (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1)
3054 assert result, out
3055
3056 logger.debug("Exiting lib API: addKernelRoute()")
3057 return True
3058
3059
3060 def configure_vxlan(tgen, input_dict):
3061 """
3062 Add and configure vxlan
3063
3064 * `tgen`: tgen object
3065 * `input_dict` : data for vxlan config
3066
3067 Usage:
3068 ------
3069 input_dict= {
3070 "dcg2":{
3071 "vxlan":[{
3072 "vxlan_name": "vxlan75100",
3073 "vxlan_id": "75100",
3074 "dstport": 4789,
3075 "local_addr": "120.0.0.1",
3076 "learning": "no",
3077 "delete": True
3078 }]
3079 }
3080 }
3081
3082 configure_vxlan(tgen, input_dict)
3083
3084 Returns:
3085 -------
3086 True or errormsg
3087
3088 """
3089
3090 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3091
3092 router_list = tgen.routers()
3093 for dut in input_dict.keys():
3094 rnode = router_list[dut]
3095
3096 if "vxlan" in input_dict[dut]:
3097 for vxlan_dict in input_dict[dut]["vxlan"]:
3098 cmd = "ip link "
3099
3100 del_vxlan = vxlan_dict.setdefault("delete", None)
3101 vxlan_names = vxlan_dict.setdefault("vxlan_name", [])
3102 vxlan_ids = vxlan_dict.setdefault("vxlan_id", [])
3103 dstport = vxlan_dict.setdefault("dstport", None)
3104 local_addr = vxlan_dict.setdefault("local_addr", None)
3105 learning = vxlan_dict.setdefault("learning", None)
3106
3107 config_data = []
3108 if vxlan_names and vxlan_ids:
3109 for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids):
3110 cmd = "ip link"
3111
3112 if del_vxlan:
3113 cmd = "{} del {} type vxlan id {}".format(
3114 cmd, vxlan_name, vxlan_id
3115 )
3116 else:
3117 cmd = "{} add {} type vxlan id {}".format(
3118 cmd, vxlan_name, vxlan_id
3119 )
3120
3121 if dstport:
3122 cmd = "{} dstport {}".format(cmd, dstport)
3123
3124 if local_addr:
3125 ip_cmd = "ip addr add {} dev {}".format(
3126 local_addr, vxlan_name
3127 )
3128 if del_vxlan:
3129 ip_cmd = "ip addr del {} dev {}".format(
3130 local_addr, vxlan_name
3131 )
3132
3133 config_data.append(ip_cmd)
3134
3135 cmd = "{} local {}".format(cmd, local_addr)
3136
3137 if learning == "no":
3138 cmd = "{} nolearning".format(cmd)
3139
3140 elif learning == "yes":
3141 cmd = "{} learning".format(cmd)
3142
3143 config_data.append(cmd)
3144
3145 try:
3146 for _cmd in config_data:
3147 logger.info("[DUT: %s]: Running command: %s", dut, _cmd)
3148 rnode.run(_cmd)
3149
3150 except InvalidCLIError:
3151 # Traceback
3152 errormsg = traceback.format_exc()
3153 logger.error(errormsg)
3154 return errormsg
3155
3156 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3157
3158 return True
3159
3160
3161 def configure_brctl(tgen, topo, input_dict):
3162 """
3163 Add and configure brctl
3164
3165 * `tgen`: tgen object
3166 * `input_dict` : data for brctl config
3167
3168 Usage:
3169 ------
3170 input_dict= {
3171 dut:{
3172 "brctl": [{
3173 "brctl_name": "br100",
3174 "addvxlan": "vxlan75100",
3175 "vrf": "RED",
3176 "stp": "off"
3177 }]
3178 }
3179 }
3180
3181 configure_brctl(tgen, topo, input_dict)
3182
3183 Returns:
3184 -------
3185 True or errormsg
3186
3187 """
3188
3189 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3190
3191 router_list = tgen.routers()
3192 for dut in input_dict.keys():
3193 rnode = router_list[dut]
3194
3195 if "brctl" in input_dict[dut]:
3196 for brctl_dict in input_dict[dut]["brctl"]:
3197
3198 brctl_names = brctl_dict.setdefault("brctl_name", [])
3199 addvxlans = brctl_dict.setdefault("addvxlan", [])
3200 stp_values = brctl_dict.setdefault("stp", [])
3201 vrfs = brctl_dict.setdefault("vrf", [])
3202
3203 ip_cmd = "ip link set"
3204 for brctl_name, vxlan, vrf, stp in zip(
3205 brctl_names, addvxlans, vrfs, stp_values
3206 ):
3207
3208 ip_cmd_list = []
3209 cmd = "ip link add name {} type bridge stp_state {}".format(
3210 brctl_name, stp
3211 )
3212
3213 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3214 rnode.run(cmd)
3215
3216 ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name))
3217
3218 if vxlan:
3219 cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name)
3220
3221 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3222 rnode.run(cmd)
3223
3224 ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan))
3225
3226 if vrf:
3227 ip_cmd_list.append(
3228 "{} dev {} master {}".format(ip_cmd, brctl_name, vrf)
3229 )
3230
3231 for intf_name, data in topo["routers"][dut]["links"].items():
3232 if "vrf" not in data:
3233 continue
3234
3235 if data["vrf"] == vrf:
3236 ip_cmd_list.append(
3237 "{} up dev {}".format(ip_cmd, data["interface"])
3238 )
3239
3240 try:
3241 for _ip_cmd in ip_cmd_list:
3242 logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd)
3243 rnode.run(_ip_cmd)
3244
3245 except InvalidCLIError:
3246 # Traceback
3247 errormsg = traceback.format_exc()
3248 logger.error(errormsg)
3249 return errormsg
3250
3251 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3252 return True
3253
3254
3255 def configure_interface_mac(tgen, input_dict):
3256 """
3257 Add and configure brctl
3258
3259 * `tgen`: tgen object
3260 * `input_dict` : data for mac config
3261
3262 input_mac= {
3263 "edge1":{
3264 "br75100": "00:80:48:BA:d1:00,
3265 "br75200": "00:80:48:BA:d1:00
3266 }
3267 }
3268
3269 configure_interface_mac(tgen, input_mac)
3270
3271 Returns:
3272 -------
3273 True or errormsg
3274
3275 """
3276
3277 router_list = tgen.routers()
3278 for dut in input_dict.keys():
3279 rnode = router_list[dut]
3280
3281 for intf, mac in input_dict[dut].items():
3282 cmd = "ip link set {} address {}".format(intf, mac)
3283 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3284
3285 try:
3286 result = rnode.run(cmd)
3287 if len(result) != 0:
3288 return result
3289
3290 except InvalidCLIError:
3291 # Traceback
3292 errormsg = traceback.format_exc()
3293 logger.error(errormsg)
3294 return errormsg
3295
3296 return True
3297
3298
3299 def socat_send_mld_join(
3300 tgen,
3301 server,
3302 protocol_option,
3303 mld_groups,
3304 send_from_intf,
3305 send_from_intf_ip=None,
3306 port=12345,
3307 reuseaddr=True,
3308 ):
3309 """
3310 API to send MLD join using SOCAT tool
3311
3312 Parameters:
3313 -----------
3314 * `tgen` : Topogen object
3315 * `server`: iperf server, from where IGMP join would be sent
3316 * `protocol_option`: Protocol options, ex: UDP6-RECV
3317 * `mld_groups`: IGMP group for which join has to be sent
3318 * `send_from_intf`: Interface from which join would be sent
3319 * `send_from_intf_ip`: Interface IP, default is None
3320 * `port`: Port to be used, default is 12345
3321 * `reuseaddr`: True|False, bydefault True
3322
3323 returns:
3324 --------
3325 errormsg or True
3326 """
3327
3328 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3329
3330 rnode = tgen.routers()[server]
3331 socat_args = "socat -u "
3332
3333 # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
3334 if protocol_option:
3335 socat_args += "{}".format(protocol_option)
3336
3337 if port:
3338 socat_args += ":{},".format(port)
3339
3340 if reuseaddr:
3341 socat_args += "{},".format("reuseaddr")
3342
3343 # Group address range to cover
3344 if mld_groups:
3345 if not isinstance(mld_groups, list):
3346 mld_groups = [mld_groups]
3347
3348 for mld_group in mld_groups:
3349 socat_cmd = socat_args
3350 join_option = "ipv6-join-group"
3351
3352 if send_from_intf and not send_from_intf_ip:
3353 socat_cmd += "{}='[{}]:{}'".format(join_option, mld_group, send_from_intf)
3354 else:
3355 socat_cmd += "{}='[{}]:{}:[{}]'".format(
3356 join_option, mld_group, send_from_intf, send_from_intf_ip
3357 )
3358
3359 socat_cmd += " STDOUT"
3360
3361 socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
3362
3363 # Run socat command to send IGMP join
3364 logger.info("[DUT: {}]: Running command: [{}]".format(server, socat_cmd))
3365 output = rnode.run("set +m; {} sleep 0.5".format(socat_cmd))
3366
3367 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3368 return True
3369
3370
3371 def socat_send_pim6_traffic(
3372 tgen,
3373 server,
3374 protocol_option,
3375 mld_groups,
3376 send_from_intf,
3377 port=12345,
3378 multicast_hops=True,
3379 ):
3380 """
3381 API to send pim6 data taffic using SOCAT tool
3382
3383 Parameters:
3384 -----------
3385 * `tgen` : Topogen object
3386 * `server`: iperf server, from where IGMP join would be sent
3387 * `protocol_option`: Protocol options, ex: UDP6-RECV
3388 * `mld_groups`: MLD group for which join has to be sent
3389 * `send_from_intf`: Interface from which join would be sent
3390 * `port`: Port to be used, default is 12345
3391 * `multicast_hops`: multicast-hops count, default is 255
3392
3393 returns:
3394 --------
3395 errormsg or True
3396 """
3397
3398 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3399
3400 rnode = tgen.routers()[server]
3401 socat_args = "socat -u STDIO "
3402
3403 # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
3404 if protocol_option:
3405 socat_args += "'{}".format(protocol_option)
3406
3407 # Group address range to cover
3408 if mld_groups:
3409 if not isinstance(mld_groups, list):
3410 mld_groups = [mld_groups]
3411
3412 for mld_group in mld_groups:
3413 socat_cmd = socat_args
3414 if port:
3415 socat_cmd += ":[{}]:{},".format(mld_group, port)
3416
3417 if send_from_intf:
3418 socat_cmd += "interface={0},so-bindtodevice={0},".format(send_from_intf)
3419
3420 if multicast_hops:
3421 socat_cmd += "multicast-hops=255'"
3422
3423 socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
3424
3425 # Run socat command to send pim6 traffic
3426 logger.info(
3427 "[DUT: {}]: Running command: [set +m; ( while sleep 1; do date; done ) | {}]".format(
3428 server, socat_cmd
3429 )
3430 )
3431
3432 # Open a shell script file and write data to it, which will be
3433 # used to send pim6 traffic continously
3434 traffic_shell_script = "{}/{}/traffic.sh".format(tgen.logdir, server)
3435 with open("{}".format(traffic_shell_script), "w") as taffic_sh:
3436 taffic_sh.write(
3437 "#!/usr/bin/env bash\n( while sleep 1; do date; done ) | {}\n".format(
3438 socat_cmd
3439 )
3440 )
3441
3442 rnode.run("chmod 755 {}".format(traffic_shell_script))
3443 output = rnode.run("{} &> /dev/null".format(traffic_shell_script))
3444
3445 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3446 return True
3447
3448
3449 def kill_socat(tgen, dut=None, action=None):
3450 """
3451 Killing socat process if running for any router in topology
3452
3453 Parameters:
3454 -----------
3455 * `tgen` : Topogen object
3456 * `dut` : Any iperf hostname to send igmp prune
3457 * `action`: to kill mld join using socat
3458 to kill mld traffic using socat
3459
3460 Usage:
3461 ------
3462 kill_socat(tgen, dut ="i6", action="remove_mld_join")
3463
3464 """
3465
3466 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3467
3468 router_list = tgen.routers()
3469 for router, rnode in router_list.items():
3470 if dut is not None and router != dut:
3471 continue
3472
3473 if action == "remove_mld_join":
3474 cmd = "ps -ef | grep socat | grep UDP6-RECV | grep {}".format(router)
3475 elif action == "remove_mld_traffic":
3476 cmd = "ps -ef | grep socat | grep UDP6-SEND | grep {}".format(router)
3477 else:
3478 cmd = "ps -ef | grep socat".format(router)
3479
3480 awk_cmd = "awk -F' ' '{print $2}' | xargs kill -9 &>/dev/null &"
3481 cmd = "{} | {}".format(cmd, awk_cmd)
3482
3483 logger.debug("[DUT: {}]: Running command: [{}]".format(router, cmd))
3484 rnode.run(cmd)
3485
3486 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3487
3488
3489 #############################################
3490 # Verification APIs
3491 #############################################
3492 @retry(retry_timeout=40)
3493 def verify_rib(
3494 tgen,
3495 addr_type,
3496 dut,
3497 input_dict,
3498 next_hop=None,
3499 protocol=None,
3500 tag=None,
3501 metric=None,
3502 fib=None,
3503 count_only=False,
3504 admin_distance=None,
3505 ):
3506 """
3507 Data will be read from input_dict or input JSON file, API will generate
3508 same prefixes, which were redistributed by either create_static_routes() or
3509 advertise_networks_using_network_command() and do will verify next_hop and
3510 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
3511 command o/p.
3512
3513 Parameters
3514 ----------
3515 * `tgen` : topogen object
3516 * `addr_type` : ip type, ipv4/ipv6
3517 * `dut`: Device Under Test, for which user wants to test the data
3518 * `input_dict` : input dict, has details of static routes
3519 * `next_hop`[optional]: next_hop which needs to be verified,
3520 default: static
3521 * `protocol`[optional]: protocol, default = None
3522 * `count_only`[optional]: count of nexthops only, not specific addresses,
3523 default = False
3524
3525 Usage
3526 -----
3527 # RIB can be verified for static routes OR network advertised using
3528 network command. Following are input_dicts to create static routes
3529 and advertise networks using network command. Any one of the input_dict
3530 can be passed to verify_rib() to verify routes in DUT"s RIB.
3531
3532 # Creating static routes for r1
3533 input_dict = {
3534 "r1": {
3535 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
3536 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
3537 }}
3538 # Advertising networks using network command in router r1
3539 input_dict = {
3540 "r1": {
3541 "advertise_networks": [{"start_ip": "20.0.0.0/32",
3542 "no_of_network": 10},
3543 {"start_ip": "30.0.0.0/32"}]
3544 }}
3545 # Verifying ipv4 routes in router r1 learned via BGP
3546 dut = "r2"
3547 protocol = "bgp"
3548 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
3549
3550 Returns
3551 -------
3552 errormsg(str) or True
3553 """
3554
3555 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3556
3557 router_list = tgen.routers()
3558 additional_nexthops_in_required_nhs = []
3559 found_hops = []
3560 for routerInput in input_dict.keys():
3561 for router, rnode in router_list.items():
3562 if router != dut:
3563 continue
3564
3565 logger.info("Checking router %s RIB:", router)
3566
3567 # Verifying RIB routes
3568 if addr_type == "ipv4":
3569 command = "show ip route"
3570 else:
3571 command = "show ipv6 route"
3572
3573 found_routes = []
3574 missing_routes = []
3575
3576 if "static_routes" in input_dict[routerInput]:
3577 static_routes = input_dict[routerInput]["static_routes"]
3578
3579 for static_route in static_routes:
3580 if "vrf" in static_route and static_route["vrf"] is not None:
3581
3582 logger.info(
3583 "[DUT: {}]: Verifying routes for VRF:"
3584 " {}".format(router, static_route["vrf"])
3585 )
3586
3587 cmd = "{} vrf {}".format(command, static_route["vrf"])
3588
3589 else:
3590 cmd = "{}".format(command)
3591
3592 if protocol:
3593 cmd = "{} {}".format(cmd, protocol)
3594
3595 cmd = "{} json".format(cmd)
3596
3597 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3598
3599 # Verifying output dictionary rib_routes_json is not empty
3600 if bool(rib_routes_json) is False:
3601 errormsg = "No route found in rib of router {}..".format(router)
3602 return errormsg
3603
3604 network = static_route["network"]
3605 if "no_of_ip" in static_route:
3606 no_of_ip = static_route["no_of_ip"]
3607 else:
3608 no_of_ip = 1
3609
3610 if "tag" in static_route:
3611 _tag = static_route["tag"]
3612 else:
3613 _tag = None
3614
3615 # Generating IPs for verification
3616 ip_list = generate_ips(network, no_of_ip)
3617 st_found = False
3618 nh_found = False
3619
3620 for st_rt in ip_list:
3621 st_rt = str(
3622 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
3623 )
3624 _addr_type = validate_ip_address(st_rt)
3625 if _addr_type != addr_type:
3626 continue
3627
3628 if st_rt in rib_routes_json:
3629 st_found = True
3630 found_routes.append(st_rt)
3631
3632 if "queued" in rib_routes_json[st_rt][0]:
3633 errormsg = "Route {} is queued\n".format(st_rt)
3634 return errormsg
3635
3636 if fib and next_hop:
3637 if type(next_hop) is not list:
3638 next_hop = [next_hop]
3639
3640 for mnh in range(0, len(rib_routes_json[st_rt])):
3641 if not "selected" in rib_routes_json[st_rt][mnh]:
3642 continue
3643
3644 if (
3645 "fib"
3646 in rib_routes_json[st_rt][mnh]["nexthops"][0]
3647 ):
3648 found_hops.append(
3649 [
3650 rib_r["ip"]
3651 for rib_r in rib_routes_json[st_rt][
3652 mnh
3653 ]["nexthops"]
3654 ]
3655 )
3656
3657 if found_hops[0]:
3658 missing_list_of_nexthops = set(
3659 found_hops[0]
3660 ).difference(next_hop)
3661 additional_nexthops_in_required_nhs = set(
3662 next_hop
3663 ).difference(found_hops[0])
3664
3665 if additional_nexthops_in_required_nhs:
3666 logger.info(
3667 "Nexthop "
3668 "%s is not active for route %s in "
3669 "RIB of router %s\n",
3670 additional_nexthops_in_required_nhs,
3671 st_rt,
3672 dut,
3673 )
3674 errormsg = (
3675 "Nexthop {} is not active"
3676 " for route {} in RIB of router"
3677 " {}\n".format(
3678 additional_nexthops_in_required_nhs,
3679 st_rt,
3680 dut,
3681 )
3682 )
3683 return errormsg
3684 else:
3685 nh_found = True
3686
3687 elif next_hop and fib is None:
3688 if type(next_hop) is not list:
3689 next_hop = [next_hop]
3690 found_hops = [
3691 rib_r["ip"]
3692 for rib_r in rib_routes_json[st_rt][0]["nexthops"]
3693 if "ip" in rib_r
3694 ]
3695
3696 # If somehow key "ip" is not found in nexthops JSON
3697 # then found_hops would be 0, this particular
3698 # situation will be handled here
3699 if not len(found_hops):
3700 errormsg = (
3701 "Nexthop {} is Missing for "
3702 "route {} in RIB of router {}\n".format(
3703 next_hop,
3704 st_rt,
3705 dut,
3706 )
3707 )
3708 return errormsg
3709
3710 # Check only the count of nexthops
3711 if count_only:
3712 if len(next_hop) == len(found_hops):
3713 nh_found = True
3714 else:
3715 errormsg = (
3716 "Nexthops are missing for "
3717 "route {} in RIB of router {}: "
3718 "expected {}, found {}\n".format(
3719 st_rt,
3720 dut,
3721 len(next_hop),
3722 len(found_hops),
3723 )
3724 )
3725 return errormsg
3726
3727 # Check the actual nexthops
3728 elif found_hops:
3729 missing_list_of_nexthops = set(
3730 found_hops
3731 ).difference(next_hop)
3732 additional_nexthops_in_required_nhs = set(
3733 next_hop
3734 ).difference(found_hops)
3735
3736 if additional_nexthops_in_required_nhs:
3737 logger.info(
3738 "Missing nexthop %s for route"
3739 " %s in RIB of router %s\n",
3740 additional_nexthops_in_required_nhs,
3741 st_rt,
3742 dut,
3743 )
3744 errormsg = (
3745 "Nexthop {} is Missing for "
3746 "route {} in RIB of router {}\n".format(
3747 additional_nexthops_in_required_nhs,
3748 st_rt,
3749 dut,
3750 )
3751 )
3752 return errormsg
3753 else:
3754 nh_found = True
3755
3756 if tag:
3757 if "tag" not in rib_routes_json[st_rt][0]:
3758 errormsg = (
3759 "[DUT: {}]: tag is not"
3760 " present for"
3761 " route {} in RIB \n".format(dut, st_rt)
3762 )
3763 return errormsg
3764
3765 if _tag != rib_routes_json[st_rt][0]["tag"]:
3766 errormsg = (
3767 "[DUT: {}]: tag value {}"
3768 " is not matched for"
3769 " route {} in RIB \n".format(
3770 dut,
3771 _tag,
3772 st_rt,
3773 )
3774 )
3775 return errormsg
3776
3777 if admin_distance is not None:
3778 if "distance" not in rib_routes_json[st_rt][0]:
3779 errormsg = (
3780 "[DUT: {}]: admin distance is"
3781 " not present for"
3782 " route {} in RIB \n".format(dut, st_rt)
3783 )
3784 return errormsg
3785
3786 if (
3787 admin_distance
3788 != rib_routes_json[st_rt][0]["distance"]
3789 ):
3790 errormsg = (
3791 "[DUT: {}]: admin distance value "
3792 "{} is not matched for "
3793 "route {} in RIB \n".format(
3794 dut,
3795 admin_distance,
3796 st_rt,
3797 )
3798 )
3799 return errormsg
3800
3801 if metric is not None:
3802 if "metric" not in rib_routes_json[st_rt][0]:
3803 errormsg = (
3804 "[DUT: {}]: metric is"
3805 " not present for"
3806 " route {} in RIB \n".format(dut, st_rt)
3807 )
3808 return errormsg
3809
3810 if metric != rib_routes_json[st_rt][0]["metric"]:
3811 errormsg = (
3812 "[DUT: {}]: metric value "
3813 "{} is not matched for "
3814 "route {} in RIB \n".format(
3815 dut,
3816 metric,
3817 st_rt,
3818 )
3819 )
3820 return errormsg
3821
3822 else:
3823 missing_routes.append(st_rt)
3824
3825 if nh_found:
3826 logger.info(
3827 "[DUT: {}]: Found next_hop {} for"
3828 " RIB routes: {}".format(router, next_hop, found_routes)
3829 )
3830
3831 if len(missing_routes) > 0:
3832 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
3833 dut, missing_routes
3834 )
3835 return errormsg
3836
3837 if found_routes:
3838 logger.info(
3839 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
3840 dut,
3841 found_routes,
3842 )
3843
3844 continue
3845
3846 if "bgp" in input_dict[routerInput]:
3847 if (
3848 "advertise_networks"
3849 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
3850 "unicast"
3851 ]
3852 ):
3853 continue
3854
3855 found_routes = []
3856 missing_routes = []
3857 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
3858 addr_type
3859 ]["unicast"]["advertise_networks"]
3860
3861 # Continue if there are no network advertise
3862 if len(advertise_network) == 0:
3863 continue
3864
3865 for advertise_network_dict in advertise_network:
3866 if "vrf" in advertise_network_dict:
3867 cmd = "{} vrf {} json".format(
3868 command, advertise_network_dict["vrf"]
3869 )
3870 else:
3871 cmd = "{} json".format(command)
3872
3873 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3874
3875 # Verifying output dictionary rib_routes_json is not empty
3876 if bool(rib_routes_json) is False:
3877 errormsg = "No route found in rib of router {}..".format(router)
3878 return errormsg
3879
3880 start_ip = advertise_network_dict["network"]
3881 if "no_of_network" in advertise_network_dict:
3882 no_of_network = advertise_network_dict["no_of_network"]
3883 else:
3884 no_of_network = 1
3885
3886 # Generating IPs for verification
3887 ip_list = generate_ips(start_ip, no_of_network)
3888 st_found = False
3889 nh_found = False
3890
3891 for st_rt in ip_list:
3892 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
3893
3894 _addr_type = validate_ip_address(st_rt)
3895 if _addr_type != addr_type:
3896 continue
3897
3898 if st_rt in rib_routes_json:
3899 st_found = True
3900 found_routes.append(st_rt)
3901
3902 if "queued" in rib_routes_json[st_rt][0]:
3903 errormsg = "Route {} is queued\n".format(st_rt)
3904 return errormsg
3905
3906 if next_hop:
3907 if type(next_hop) is not list:
3908 next_hop = [next_hop]
3909
3910 count = 0
3911 for nh in next_hop:
3912 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
3913 if nh_dict["ip"] != nh:
3914 continue
3915 else:
3916 count += 1
3917
3918 if count == len(next_hop):
3919 nh_found = True
3920 else:
3921 errormsg = (
3922 "Nexthop {} is Missing"
3923 " for route {} in "
3924 "RIB of router {}\n".format(next_hop, st_rt, dut)
3925 )
3926 return errormsg
3927 else:
3928 missing_routes.append(st_rt)
3929
3930 if nh_found:
3931 logger.info(
3932 "Found next_hop {} for all routes in RIB"
3933 " of router {}\n".format(next_hop, dut)
3934 )
3935
3936 if len(missing_routes) > 0:
3937 errormsg = (
3938 "Missing {} route in RIB of router {}, "
3939 "routes: {} \n".format(addr_type, dut, missing_routes)
3940 )
3941 return errormsg
3942
3943 if found_routes:
3944 logger.info(
3945 "Verified {} routes in router {} RIB, found"
3946 " routes are: {}\n".format(addr_type, dut, found_routes)
3947 )
3948
3949 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3950 return True
3951
3952
3953 @retry(retry_timeout=12)
3954 def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
3955 """
3956 Data will be read from input_dict or input JSON file, API will generate
3957 same prefixes, which were redistributed by either create_static_routes() or
3958 advertise_networks_using_network_command() and will verify next_hop and
3959 each prefix/routes is present in "show ip/ipv6 fib json"
3960 command o/p.
3961
3962 Parameters
3963 ----------
3964 * `tgen` : topogen object
3965 * `addr_type` : ip type, ipv4/ipv6
3966 * `dut`: Device Under Test, for which user wants to test the data
3967 * `input_dict` : input dict, has details of static routes
3968 * `next_hop`[optional]: next_hop which needs to be verified,
3969 default: static
3970
3971 Usage
3972 -----
3973 input_routes_r1 = {
3974 "r1": {
3975 "static_routes": [{
3976 "network": ["1.1.1.1/32],
3977 "next_hop": "Null0",
3978 "vrf": "RED"
3979 }]
3980 }
3981 }
3982 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
3983
3984 Returns
3985 -------
3986 errormsg(str) or True
3987 """
3988
3989 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3990
3991 router_list = tgen.routers()
3992 if dut not in router_list:
3993 return
3994
3995 for routerInput in input_dict.keys():
3996 # XXX replace with router = dut; rnode = router_list[dut]
3997 for router, rnode in router_list.items():
3998 if router != dut:
3999 continue
4000
4001 logger.info("Checking router %s FIB routes:", router)
4002
4003 # Verifying RIB routes
4004 if addr_type == "ipv4":
4005 command = "show ip fib"
4006 else:
4007 command = "show ipv6 fib"
4008
4009 found_routes = []
4010 missing_routes = []
4011
4012 if protocol:
4013 command = "{} {}".format(command, protocol)
4014
4015 if "static_routes" in input_dict[routerInput]:
4016 static_routes = input_dict[routerInput]["static_routes"]
4017
4018 for static_route in static_routes:
4019 if "vrf" in static_route and static_route["vrf"] is not None:
4020
4021 logger.info(
4022 "[DUT: {}]: Verifying routes for VRF:"
4023 " {}".format(router, static_route["vrf"])
4024 )
4025
4026 cmd = "{} vrf {}".format(command, static_route["vrf"])
4027
4028 else:
4029 cmd = "{}".format(command)
4030
4031 cmd = "{} json".format(cmd)
4032
4033 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
4034
4035 # Verifying output dictionary rib_routes_json is not empty
4036 if bool(rib_routes_json) is False:
4037 errormsg = "[DUT: {}]: No route found in fib".format(router)
4038 return errormsg
4039
4040 network = static_route["network"]
4041 if "no_of_ip" in static_route:
4042 no_of_ip = static_route["no_of_ip"]
4043 else:
4044 no_of_ip = 1
4045
4046 # Generating IPs for verification
4047 ip_list = generate_ips(network, no_of_ip)
4048 st_found = False
4049 nh_found = False
4050
4051 for st_rt in ip_list:
4052 st_rt = str(
4053 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
4054 )
4055 _addr_type = validate_ip_address(st_rt)
4056 if _addr_type != addr_type:
4057 continue
4058
4059 if st_rt in rib_routes_json:
4060 st_found = True
4061 found_routes.append(st_rt)
4062
4063 if next_hop:
4064 if type(next_hop) is not list:
4065 next_hop = [next_hop]
4066
4067 count = 0
4068 for nh in next_hop:
4069 for nh_dict in rib_routes_json[st_rt][0][
4070 "nexthops"
4071 ]:
4072 if nh_dict["ip"] != nh:
4073 continue
4074 else:
4075 count += 1
4076
4077 if count == len(next_hop):
4078 nh_found = True
4079 else:
4080 missing_routes.append(st_rt)
4081 errormsg = (
4082 "Nexthop {} is Missing"
4083 " for route {} in "
4084 "RIB of router {}\n".format(
4085 next_hop, st_rt, dut
4086 )
4087 )
4088 return errormsg
4089
4090 else:
4091 missing_routes.append(st_rt)
4092
4093 if len(missing_routes) > 0:
4094 errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
4095 dut, missing_routes
4096 )
4097 return errormsg
4098
4099 if nh_found:
4100 logger.info(
4101 "Found next_hop {} for all routes in RIB"
4102 " of router {}\n".format(next_hop, dut)
4103 )
4104
4105 if found_routes:
4106 logger.info(
4107 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
4108 dut,
4109 found_routes,
4110 )
4111
4112 continue
4113
4114 if "bgp" in input_dict[routerInput]:
4115 if (
4116 "advertise_networks"
4117 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
4118 "unicast"
4119 ]
4120 ):
4121 continue
4122
4123 found_routes = []
4124 missing_routes = []
4125 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
4126 addr_type
4127 ]["unicast"]["advertise_networks"]
4128
4129 # Continue if there are no network advertise
4130 if len(advertise_network) == 0:
4131 continue
4132
4133 for advertise_network_dict in advertise_network:
4134 if "vrf" in advertise_network_dict:
4135 cmd = "{} vrf {} json".format(command, static_route["vrf"])
4136 else:
4137 cmd = "{} json".format(command)
4138
4139 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
4140
4141 # Verifying output dictionary rib_routes_json is not empty
4142 if bool(rib_routes_json) is False:
4143 errormsg = "No route found in rib of router {}..".format(router)
4144 return errormsg
4145
4146 start_ip = advertise_network_dict["network"]
4147 if "no_of_network" in advertise_network_dict:
4148 no_of_network = advertise_network_dict["no_of_network"]
4149 else:
4150 no_of_network = 1
4151
4152 # Generating IPs for verification
4153 ip_list = generate_ips(start_ip, no_of_network)
4154 st_found = False
4155 nh_found = False
4156
4157 for st_rt in ip_list:
4158 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
4159
4160 _addr_type = validate_ip_address(st_rt)
4161 if _addr_type != addr_type:
4162 continue
4163
4164 if st_rt in rib_routes_json:
4165 st_found = True
4166 found_routes.append(st_rt)
4167
4168 if next_hop:
4169 if type(next_hop) is not list:
4170 next_hop = [next_hop]
4171
4172 count = 0
4173 for nh in next_hop:
4174 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
4175 if nh_dict["ip"] != nh:
4176 continue
4177 else:
4178 count += 1
4179
4180 if count == len(next_hop):
4181 nh_found = True
4182 else:
4183 missing_routes.append(st_rt)
4184 errormsg = (
4185 "Nexthop {} is Missing"
4186 " for route {} in "
4187 "RIB of router {}\n".format(next_hop, st_rt, dut)
4188 )
4189 return errormsg
4190 else:
4191 missing_routes.append(st_rt)
4192
4193 if len(missing_routes) > 0:
4194 errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
4195 dut, missing_routes
4196 )
4197 return errormsg
4198
4199 if nh_found:
4200 logger.info(
4201 "Found next_hop {} for all routes in RIB"
4202 " of router {}\n".format(next_hop, dut)
4203 )
4204
4205 if found_routes:
4206 logger.info(
4207 "[DUT: {}]: Verified routes FIB"
4208 ", found routes are: {}\n".format(dut, found_routes)
4209 )
4210
4211 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4212 return True
4213
4214
4215 def verify_admin_distance_for_static_routes(tgen, input_dict):
4216 """
4217 API to verify admin distance for static routes as defined in input_dict/
4218 input JSON by running show ip/ipv6 route json command.
4219 Parameter
4220 ---------
4221 * `tgen` : topogen object
4222 * `input_dict`: having details like - for which router and static routes
4223 admin dsitance needs to be verified
4224 Usage
4225 -----
4226 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
4227 10.0.0.2 in router r1
4228 input_dict = {
4229 "r1": {
4230 "static_routes": [{
4231 "network": "10.0.20.1/32",
4232 "admin_distance": 10,
4233 "next_hop": "10.0.0.2"
4234 }]
4235 }
4236 }
4237 result = verify_admin_distance_for_static_routes(tgen, input_dict)
4238 Returns
4239 -------
4240 errormsg(str) or True
4241 """
4242
4243 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4244
4245 router_list = tgen.routers()
4246 for router in input_dict.keys():
4247 if router not in router_list:
4248 continue
4249 rnode = router_list[router]
4250
4251 for static_route in input_dict[router]["static_routes"]:
4252 addr_type = validate_ip_address(static_route["network"])
4253 # Command to execute
4254 if addr_type == "ipv4":
4255 command = "show ip route json"
4256 else:
4257 command = "show ipv6 route json"
4258 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
4259
4260 logger.info(
4261 "Verifying admin distance for static route %s" " under dut %s:",
4262 static_route,
4263 router,
4264 )
4265 network = static_route["network"]
4266 next_hop = static_route["next_hop"]
4267 admin_distance = static_route["admin_distance"]
4268 route_data = show_ip_route_json[network][0]
4269 if network in show_ip_route_json:
4270 if route_data["nexthops"][0]["ip"] == next_hop:
4271 if route_data["distance"] != admin_distance:
4272 errormsg = (
4273 "Verification failed: admin distance"
4274 " for static route {} under dut {},"
4275 " found:{} but expected:{}".format(
4276 static_route,
4277 router,
4278 route_data["distance"],
4279 admin_distance,
4280 )
4281 )
4282 return errormsg
4283 else:
4284 logger.info(
4285 "Verification successful: admin"
4286 " distance for static route %s under"
4287 " dut %s, found:%s",
4288 static_route,
4289 router,
4290 route_data["distance"],
4291 )
4292
4293 else:
4294 errormsg = (
4295 "Static route {} not found in "
4296 "show_ip_route_json for dut {}".format(network, router)
4297 )
4298 return errormsg
4299
4300 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4301 return True
4302
4303
4304 def verify_prefix_lists(tgen, input_dict):
4305 """
4306 Running "show ip prefix-list" command and verifying given prefix-list
4307 is present in router.
4308 Parameters
4309 ----------
4310 * `tgen` : topogen object
4311 * `input_dict`: data to verify prefix lists
4312 Usage
4313 -----
4314 # To verify pf_list_1 is present in router r1
4315 input_dict = {
4316 "r1": {
4317 "prefix_lists": ["pf_list_1"]
4318 }}
4319 result = verify_prefix_lists("ipv4", input_dict, tgen)
4320 Returns
4321 -------
4322 errormsg(str) or True
4323 """
4324
4325 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4326
4327 router_list = tgen.routers()
4328 for router in input_dict.keys():
4329 if router not in router_list:
4330 continue
4331
4332 rnode = router_list[router]
4333
4334 # Show ip prefix list
4335 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
4336
4337 # Verify Prefix list is deleted
4338 prefix_lists_addr = input_dict[router]["prefix_lists"]
4339 for addr_type in prefix_lists_addr:
4340 if not check_address_types(addr_type):
4341 continue
4342 # show ip prefix list
4343 if addr_type == "ipv4":
4344 cmd = "show ip prefix-list"
4345 else:
4346 cmd = "show {} prefix-list".format(addr_type)
4347 show_prefix_list = run_frr_cmd(rnode, cmd)
4348 for prefix_list in prefix_lists_addr[addr_type].keys():
4349 if prefix_list in show_prefix_list:
4350 errormsg = (
4351 "Prefix list {} is/are present in the router"
4352 " {}".format(prefix_list, router)
4353 )
4354 return errormsg
4355
4356 logger.info(
4357 "Prefix list %s is/are not present in the router" " from router %s",
4358 prefix_list,
4359 router,
4360 )
4361
4362 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4363 return True
4364
4365
4366 @retry(retry_timeout=12)
4367 def verify_route_maps(tgen, input_dict):
4368 """
4369 Running "show route-map" command and verifying given route-map
4370 is present in router.
4371 Parameters
4372 ----------
4373 * `tgen` : topogen object
4374 * `input_dict`: data to verify prefix lists
4375 Usage
4376 -----
4377 # To verify rmap_1 and rmap_2 are present in router r1
4378 input_dict = {
4379 "r1": {
4380 "route_maps": ["rmap_1", "rmap_2"]
4381 }
4382 }
4383 result = verify_route_maps(tgen, input_dict)
4384 Returns
4385 -------
4386 errormsg(str) or True
4387 """
4388
4389 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4390
4391 router_list = tgen.routers()
4392 for router in input_dict.keys():
4393 if router not in router_list:
4394 continue
4395
4396 rnode = router_list[router]
4397 # Show ip route-map
4398 show_route_maps = rnode.vtysh_cmd("show route-map")
4399
4400 # Verify route-map is deleted
4401 route_maps = input_dict[router]["route_maps"]
4402 for route_map in route_maps:
4403 if route_map in show_route_maps:
4404 errormsg = "Route map {} is not deleted from router" " {}".format(
4405 route_map, router
4406 )
4407 return errormsg
4408
4409 logger.info(
4410 "Route map %s is/are deleted successfully from" " router %s",
4411 route_maps,
4412 router,
4413 )
4414
4415 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4416 return True
4417
4418
4419 @retry(retry_timeout=16)
4420 def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
4421 """
4422 API to veiryf BGP large community is attached in route for any given
4423 DUT by running "show bgp ipv4/6 {route address} json" command.
4424 Parameters
4425 ----------
4426 * `tgen`: topogen object
4427 * `addr_type` : ip type, ipv4/ipv6
4428 * `dut`: Device Under Test
4429 * `network`: network for which set criteria needs to be verified
4430 * `input_dict`: having details like - for which router, community and
4431 values needs to be verified
4432 Usage
4433 -----
4434 networks = ["200.50.2.0/32"]
4435 input_dict = {
4436 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
4437 }
4438 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
4439 Returns
4440 -------
4441 errormsg(str) or True
4442 """
4443
4444 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4445 router_list = tgen.routers()
4446 if router not in router_list:
4447 return False
4448
4449 rnode = router_list[router]
4450
4451 logger.debug(
4452 "Verifying BGP community attributes on dut %s: for %s " "network %s",
4453 router,
4454 addr_type,
4455 network,
4456 )
4457
4458 for net in network:
4459 cmd = "show bgp {} {} json".format(addr_type, net)
4460 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
4461 logger.info(show_bgp_json)
4462 if "paths" not in show_bgp_json:
4463 return "Prefix {} not found in BGP table of router: {}".format(net, router)
4464
4465 as_paths = show_bgp_json["paths"]
4466 found = False
4467 for i in range(len(as_paths)):
4468 if (
4469 "largeCommunity" in show_bgp_json["paths"][i]
4470 or "community" in show_bgp_json["paths"][i]
4471 ):
4472 found = True
4473 logger.info(
4474 "Large Community attribute is found for route:" " %s in router: %s",
4475 net,
4476 router,
4477 )
4478 if input_dict is not None:
4479 for criteria, comm_val in input_dict.items():
4480 show_val = show_bgp_json["paths"][i][criteria]["string"]
4481 if comm_val == show_val:
4482 logger.info(
4483 "Verifying BGP %s for prefix: %s"
4484 " in router: %s, found expected"
4485 " value: %s",
4486 criteria,
4487 net,
4488 router,
4489 comm_val,
4490 )
4491 else:
4492 errormsg = (
4493 "Failed: Verifying BGP attribute"
4494 " {} for route: {} in router: {}"
4495 ", expected value: {} but found"
4496 ": {}".format(criteria, net, router, comm_val, show_val)
4497 )
4498 return errormsg
4499
4500 if not found:
4501 errormsg = (
4502 "Large Community attribute is not found for route: "
4503 "{} in router: {} ".format(net, router)
4504 )
4505 return errormsg
4506
4507 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4508 return True
4509
4510
4511 def get_ipv6_linklocal_address(topo, node, intf):
4512 """
4513 API to get the link local ipv6 address of a particular interface
4514
4515 Parameters
4516 ----------
4517 * `node`: node on which link local ip to be fetched.
4518 * `intf` : interface for which link local ip needs to be returned.
4519 * `topo` : base topo
4520
4521 Usage
4522 -----
4523 result = get_ipv6_linklocal_address(topo, 'r1', 'r2')
4524
4525 Returns link local ip of interface between r1 and r2.
4526
4527 Returns
4528 -------
4529 1) link local ipv6 address from the interface
4530 2) errormsg - when link local ip not found
4531 """
4532 tgen = get_topogen()
4533 ext_nh = tgen.net[node].get_ipv6_linklocal()
4534 req_nh = topo[node]["links"][intf]["interface"]
4535 llip = None
4536 for llips in ext_nh:
4537 if llips[0] == req_nh:
4538 llip = llips[1]
4539 logger.info("Link local ip found = %s", llip)
4540 return llip
4541
4542 errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format(
4543 node, intf
4544 )
4545
4546 return errormsg
4547
4548
4549 def verify_create_community_list(tgen, input_dict):
4550 """
4551 API is to verify if large community list is created for any given DUT in
4552 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
4553 command.
4554 Parameters
4555 ----------
4556 * `tgen`: topogen object
4557 * `input_dict`: having details like - for which router, large community
4558 needs to be verified
4559 Usage
4560 -----
4561 input_dict = {
4562 "r1": {
4563 "large-community-list": {
4564 "standard": {
4565 "Test1": [{"action": "PERMIT", "attribute":\
4566 ""}]
4567 }}}}
4568 result = verify_create_community_list(tgen, input_dict)
4569 Returns
4570 -------
4571 errormsg(str) or True
4572 """
4573
4574 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4575
4576 router_list = tgen.routers()
4577 for router in input_dict.keys():
4578 if router not in router_list:
4579 continue
4580
4581 rnode = router_list[router]
4582
4583 logger.info("Verifying large-community is created for dut %s:", router)
4584
4585 for comm_data in input_dict[router]["bgp_community_lists"]:
4586 comm_name = comm_data["name"]
4587 comm_type = comm_data["community_type"]
4588 show_bgp_community = run_frr_cmd(
4589 rnode, "show bgp large-community-list {} detail".format(comm_name)
4590 )
4591
4592 # Verify community list and type
4593 if comm_name in show_bgp_community and comm_type in show_bgp_community:
4594 logger.info(
4595 "BGP %s large-community-list %s is" " created", comm_type, comm_name
4596 )
4597 else:
4598 errormsg = "BGP {} large-community-list {} is not" " created".format(
4599 comm_type, comm_name
4600 )
4601 return errormsg
4602
4603 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4604 return True
4605
4606
4607 def verify_cli_json(tgen, input_dict):
4608 """
4609 API to verify if JSON is available for clis
4610 command.
4611 Parameters
4612 ----------
4613 * `tgen`: topogen object
4614 * `input_dict`: CLIs for which JSON needs to be verified
4615 Usage
4616 -----
4617 input_dict = {
4618 "edge1":{
4619 "cli": ["show evpn vni detail", show evpn rmac vni all]
4620 }
4621 }
4622
4623 result = verify_cli_json(tgen, input_dict)
4624
4625 Returns
4626 -------
4627 errormsg(str) or True
4628 """
4629
4630 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4631 for dut in input_dict.keys():
4632 rnode = tgen.gears[dut]
4633
4634 for cli in input_dict[dut]["cli"]:
4635 logger.info(
4636 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli
4637 )
4638
4639 test_cli = "{} json".format(cli)
4640 ret_json = rnode.vtysh_cmd(test_cli, isjson=True)
4641 if not bool(ret_json):
4642 errormsg = "CLI: %s, JSON format is not available" % (cli)
4643 return errormsg
4644 elif "unknown" in ret_json or "Unknown" in ret_json:
4645 errormsg = "CLI: %s, JSON format is not available" % (cli)
4646 return errormsg
4647 else:
4648 logger.info(
4649 "CLI : %s JSON format is available: " "\n %s", cli, ret_json
4650 )
4651
4652 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4653
4654 return True
4655
4656
4657 @retry(retry_timeout=12)
4658 def verify_evpn_vni(tgen, input_dict):
4659 """
4660 API to verify evpn vni details using "show evpn vni detail json"
4661 command.
4662
4663 Parameters
4664 ----------
4665 * `tgen`: topogen object
4666 * `input_dict`: having details like - for which router, evpn details
4667 needs to be verified
4668 Usage
4669 -----
4670 input_dict = {
4671 "edge1":{
4672 "vni": [
4673 {
4674 "75100":{
4675 "vrf": "RED",
4676 "vxlanIntf": "vxlan75100",
4677 "localVtepIp": "120.1.1.1",
4678 "sviIntf": "br100"
4679 }
4680 }
4681 ]
4682 }
4683 }
4684
4685 result = verify_evpn_vni(tgen, input_dict)
4686
4687 Returns
4688 -------
4689 errormsg(str) or True
4690 """
4691
4692 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4693 for dut in input_dict.keys():
4694 rnode = tgen.gears[dut]
4695
4696 logger.info("[DUT: %s]: Verifying evpn vni details :", dut)
4697
4698 cmd = "show evpn vni detail json"
4699 evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4700 if not bool(evpn_all_vni_json):
4701 errormsg = "No output for '{}' cli".format(cmd)
4702 return errormsg
4703
4704 if "vni" in input_dict[dut]:
4705 for vni_dict in input_dict[dut]["vni"]:
4706 found = False
4707 vni = vni_dict["name"]
4708 for evpn_vni_json in evpn_all_vni_json:
4709 if "vni" in evpn_vni_json:
4710 if evpn_vni_json["vni"] != int(vni):
4711 continue
4712
4713 for attribute in vni_dict.keys():
4714 if vni_dict[attribute] != evpn_vni_json[attribute]:
4715 errormsg = (
4716 "[DUT: %s] Verifying "
4717 "%s for VNI: %s [FAILED]||"
4718 ", EXPECTED : %s "
4719 " FOUND : %s"
4720 % (
4721 dut,
4722 attribute,
4723 vni,
4724 vni_dict[attribute],
4725 evpn_vni_json[attribute],
4726 )
4727 )
4728 return errormsg
4729
4730 else:
4731 found = True
4732 logger.info(
4733 "[DUT: %s] Verifying"
4734 " %s for VNI: %s , "
4735 "Found Expected : %s ",
4736 dut,
4737 attribute,
4738 vni,
4739 evpn_vni_json[attribute],
4740 )
4741
4742 if evpn_vni_json["state"] != "Up":
4743 errormsg = (
4744 "[DUT: %s] Failed: Verifying"
4745 " State for VNI: %s is not Up" % (dut, vni)
4746 )
4747 return errormsg
4748
4749 else:
4750 errormsg = (
4751 "[DUT: %s] Failed:"
4752 " VNI: %s is not present in JSON" % (dut, vni)
4753 )
4754 return errormsg
4755
4756 if found:
4757 logger.info(
4758 "[DUT %s]: Verifying VNI : %s "
4759 "details and state is Up [PASSED]!!",
4760 dut,
4761 vni,
4762 )
4763 return True
4764
4765 else:
4766 errormsg = (
4767 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut)
4768 )
4769 return errormsg
4770
4771 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4772 return False
4773
4774
4775 @retry(retry_timeout=12)
4776 def verify_vrf_vni(tgen, input_dict):
4777 """
4778 API to verify vrf vni details using "show vrf vni json"
4779 command.
4780 Parameters
4781 ----------
4782 * `tgen`: topogen object
4783 * `input_dict`: having details like - for which router, evpn details
4784 needs to be verified
4785 Usage
4786 -----
4787 input_dict = {
4788 "edge1":{
4789 "vrfs": [
4790 {
4791 "RED":{
4792 "vni": 75000,
4793 "vxlanIntf": "vxlan75100",
4794 "sviIntf": "br100",
4795 "routerMac": "00:80:48:ba:d1:00",
4796 "state": "Up"
4797 }
4798 }
4799 ]
4800 }
4801 }
4802
4803 result = verify_vrf_vni(tgen, input_dict)
4804
4805 Returns
4806 -------
4807 errormsg(str) or True
4808 """
4809
4810 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4811 for dut in input_dict.keys():
4812 rnode = tgen.gears[dut]
4813
4814 logger.info("[DUT: %s]: Verifying vrf vni details :", dut)
4815
4816 cmd = "show vrf vni json"
4817 vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4818 if not bool(vrf_all_vni_json):
4819 errormsg = "No output for '{}' cli".format(cmd)
4820 return errormsg
4821
4822 if "vrfs" in input_dict[dut]:
4823 for vrfs in input_dict[dut]["vrfs"]:
4824 for vrf, vrf_dict in vrfs.items():
4825 found = False
4826 for vrf_vni_json in vrf_all_vni_json["vrfs"]:
4827 if "vrf" in vrf_vni_json:
4828 if vrf_vni_json["vrf"] != vrf:
4829 continue
4830
4831 for attribute in vrf_dict.keys():
4832 if vrf_dict[attribute] == vrf_vni_json[attribute]:
4833 found = True
4834 logger.info(
4835 "[DUT %s]: VRF: %s, "
4836 "verifying %s "
4837 ", Found Expected: %s "
4838 "[PASSED]!!",
4839 dut,
4840 vrf,
4841 attribute,
4842 vrf_vni_json[attribute],
4843 )
4844 else:
4845 errormsg = (
4846 "[DUT: %s] VRF: %s, "
4847 "verifying %s [FAILED!!] "
4848 ", EXPECTED : %s "
4849 ", FOUND : %s"
4850 % (
4851 dut,
4852 vrf,
4853 attribute,
4854 vrf_dict[attribute],
4855 vrf_vni_json[attribute],
4856 )
4857 )
4858 return errormsg
4859
4860 else:
4861 errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % (
4862 dut,
4863 vrf,
4864 )
4865 return errormsg
4866
4867 if found:
4868 logger.info(
4869 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
4870 dut,
4871 vrf,
4872 )
4873 return True
4874
4875 else:
4876 errormsg = (
4877 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut)
4878 )
4879 return errormsg
4880
4881 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4882 return False
4883
4884
4885 def required_linux_kernel_version(required_version):
4886 """
4887 This API is used to check linux version compatibility of the test suite.
4888 If version mentioned in required_version is higher than the linux kernel
4889 of the system, test suite will be skipped. This API returns true or errormsg.
4890
4891 Parameters
4892 ----------
4893 * `required_version` : Kernel version required for the suites to run.
4894
4895 Usage
4896 -----
4897 result = linux_kernel_version_lowerthan('4.15')
4898
4899 Returns
4900 -------
4901 errormsg(str) or True
4902 """
4903 system_kernel = platform.release()
4904 if version_cmp(system_kernel, required_version) < 0:
4905 error_msg = (
4906 'These tests will not run on kernel "{}", '
4907 "they require kernel >= {})".format(system_kernel, required_version)
4908 )
4909
4910 logger.info(error_msg)
4911
4912 return error_msg
4913 return True
4914
4915
4916 class HostApplicationHelper(object):
4917 """Helper to track and cleanup per-host based test processes."""
4918
4919 def __init__(self, tgen=None, base_cmd=None):
4920 self.base_cmd_str = ""
4921 self.host_procs = {}
4922 self.tgen = None
4923 self.set_base_cmd(base_cmd if base_cmd else [])
4924 if tgen is not None:
4925 self.init(tgen)
4926
4927 def __enter__(self):
4928 self.init()
4929 return self
4930
4931 def __exit__(self, type, value, traceback):
4932 self.cleanup()
4933
4934 def __str__(self):
4935 return "HostApplicationHelper({})".format(self.base_cmd_str)
4936
4937 def set_base_cmd(self, base_cmd):
4938 assert isinstance(base_cmd, list) or isinstance(base_cmd, tuple)
4939 self.base_cmd = base_cmd
4940 if base_cmd:
4941 self.base_cmd_str = " ".join(base_cmd)
4942 else:
4943 self.base_cmd_str = ""
4944
4945 def init(self, tgen=None):
4946 """Initialize the helper with tgen if needed.
4947
4948 If overridden, need to handle multiple entries but one init. Will be called on
4949 object creation if tgen is supplied. Will be called again on __enter__ so should
4950 not re-init if already inited.
4951 """
4952 if self.tgen:
4953 assert tgen is None or self.tgen == tgen
4954 else:
4955 self.tgen = tgen
4956
4957 def started_proc(self, host, p):
4958 """Called after process started on host.
4959
4960 Return value is passed to `stopping_proc` method."""
4961 logger.debug("%s: Doing nothing after starting process", self)
4962 return False
4963
4964 def stopping_proc(self, host, p, info):
4965 """Called after process started on host."""
4966 logger.debug("%s: Doing nothing before stopping process", self)
4967
4968 def _add_host_proc(self, host, p):
4969 v = self.started_proc(host, p)
4970
4971 if host not in self.host_procs:
4972 self.host_procs[host] = []
4973 logger.debug("%s: %s: tracking process %s", self, host, p)
4974 self.host_procs[host].append((p, v))
4975
4976 def stop_host(self, host):
4977 """Stop the process on the host.
4978
4979 Override to do additional cleanup."""
4980 if host in self.host_procs:
4981 hlogger = self.tgen.net[host].logger
4982 for p, v in self.host_procs[host]:
4983 self.stopping_proc(host, p, v)
4984 logger.debug("%s: %s: terminating process %s", self, host, p.pid)
4985 hlogger.debug("%s: %s: terminating process %s", self, host, p.pid)
4986 rc = p.poll()
4987 if rc is not None:
4988 logger.error(
4989 "%s: %s: process early exit %s: %s",
4990 self,
4991 host,
4992 p.pid,
4993 comm_error(p),
4994 )
4995 hlogger.error(
4996 "%s: %s: process early exit %s: %s",
4997 self,
4998 host,
4999 p.pid,
5000 comm_error(p),
5001 )
5002 else:
5003 p.terminate()
5004 p.wait()
5005 logger.debug(
5006 "%s: %s: terminated process %s: %s",
5007 self,
5008 host,
5009 p.pid,
5010 comm_error(p),
5011 )
5012 hlogger.debug(
5013 "%s: %s: terminated process %s: %s",
5014 self,
5015 host,
5016 p.pid,
5017 comm_error(p),
5018 )
5019
5020 del self.host_procs[host]
5021
5022 def stop_all_hosts(self):
5023 hosts = set(self.host_procs)
5024 for host in hosts:
5025 self.stop_host(host)
5026
5027 def cleanup(self):
5028 self.stop_all_hosts()
5029
5030 def run(self, host, cmd_args, **kwargs):
5031 cmd = list(self.base_cmd)
5032 cmd.extend(cmd_args)
5033 p = self.tgen.gears[host].popen(cmd, **kwargs)
5034 assert p.poll() is None
5035 self._add_host_proc(host, p)
5036 return p
5037
5038 def check_procs(self):
5039 """Check that all current processes are running, log errors if not.
5040
5041 Returns: List of stopped processes."""
5042 procs = []
5043
5044 logger.debug("%s: checking procs on hosts %s", self, self.host_procs.keys())
5045
5046 for host in self.host_procs:
5047 hlogger = self.tgen.net[host].logger
5048 for p, _ in self.host_procs[host]:
5049 logger.debug("%s: checking %s proc %s", self, host, p)
5050 rc = p.poll()
5051 if rc is None:
5052 continue
5053 logger.error(
5054 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
5055 )
5056 hlogger.error(
5057 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
5058 )
5059 procs.append(p)
5060 return procs
5061
5062
5063 class IPerfHelper(HostApplicationHelper):
5064 def __str__(self):
5065 return "IPerfHelper()"
5066
5067 def run_join(
5068 self,
5069 host,
5070 join_addr,
5071 l4Type="UDP",
5072 join_interval=1,
5073 join_intf=None,
5074 join_towards=None,
5075 ):
5076 """
5077 Use iperf to send IGMP join and listen to traffic
5078
5079 Parameters:
5080 -----------
5081 * `host`: iperf host from where IGMP join would be sent
5082 * `l4Type`: string, one of [ TCP, UDP ]
5083 * `join_addr`: multicast address (or addresses) to join to
5084 * `join_interval`: seconds between periodic bandwidth reports
5085 * `join_intf`: the interface to bind the join to
5086 * `join_towards`: router whos interface to bind the join to
5087
5088 returns: Success (bool)
5089 """
5090
5091 iperf_path = self.tgen.net.get_exec_path("iperf")
5092
5093 assert join_addr
5094 if not isinstance(join_addr, list) and not isinstance(join_addr, tuple):
5095 join_addr = [ipaddress.IPv4Address(frr_unicode(join_addr))]
5096
5097 for bindTo in join_addr:
5098 iperf_args = [iperf_path, "-s"]
5099
5100 if l4Type == "UDP":
5101 iperf_args.append("-u")
5102
5103 iperf_args.append("-B")
5104 if join_towards:
5105 to_intf = frr_unicode(
5106 self.tgen.json_topo["routers"][host]["links"][join_towards][
5107 "interface"
5108 ]
5109 )
5110 iperf_args.append("{}%{}".format(str(bindTo), to_intf))
5111 elif join_intf:
5112 iperf_args.append("{}%{}".format(str(bindTo), join_intf))
5113 else:
5114 iperf_args.append(str(bindTo))
5115
5116 if join_interval:
5117 iperf_args.append("-i")
5118 iperf_args.append(str(join_interval))
5119
5120 p = self.run(host, iperf_args)
5121 if p.poll() is not None:
5122 logger.error("IGMP join failed on %s: %s", bindTo, comm_error(p))
5123 return False
5124 return True
5125
5126 def run_traffic(
5127 self, host, sentToAddress, ttl, time=0, l4Type="UDP", bind_towards=None
5128 ):
5129 """
5130 Run iperf to send IGMP join and traffic
5131
5132 Parameters:
5133 -----------
5134 * `host`: iperf host to send traffic from
5135 * `l4Type`: string, one of [ TCP, UDP ]
5136 * `sentToAddress`: multicast address to send traffic to
5137 * `ttl`: time to live
5138 * `time`: time in seconds to transmit for
5139 * `bind_towards`: Router who's interface the source ip address is got from
5140
5141 returns: Success (bool)
5142 """
5143
5144 iperf_path = self.tgen.net.get_exec_path("iperf")
5145
5146 if sentToAddress and not isinstance(sentToAddress, list):
5147 sentToAddress = [ipaddress.IPv4Address(frr_unicode(sentToAddress))]
5148
5149 for sendTo in sentToAddress:
5150 iperf_args = [iperf_path, "-c", sendTo]
5151
5152 # Bind to Interface IP
5153 if bind_towards:
5154 ifaddr = frr_unicode(
5155 self.tgen.json_topo["routers"][host]["links"][bind_towards]["ipv4"]
5156 )
5157 ipaddr = ipaddress.IPv4Interface(ifaddr).ip
5158 iperf_args.append("-B")
5159 iperf_args.append(str(ipaddr))
5160
5161 # UDP/TCP
5162 if l4Type == "UDP":
5163 iperf_args.append("-u")
5164 iperf_args.append("-b")
5165 iperf_args.append("0.012m")
5166
5167 # TTL
5168 if ttl:
5169 iperf_args.append("-T")
5170 iperf_args.append(str(ttl))
5171
5172 # Time
5173 if time:
5174 iperf_args.append("-t")
5175 iperf_args.append(str(time))
5176
5177 p = self.run(host, iperf_args)
5178 if p.poll() is not None:
5179 logger.error(
5180 "mcast traffic send failed for %s: %s", sendTo, comm_error(p)
5181 )
5182 return False
5183
5184 return True
5185
5186
5187 def verify_ip_nht(tgen, input_dict):
5188 """
5189 Running "show ip nht" command and verifying given nexthop resolution
5190 Parameters
5191 ----------
5192 * `tgen` : topogen object
5193 * `input_dict`: data to verify nexthop
5194 Usage
5195 -----
5196 input_dict_4 = {
5197 "r1": {
5198 nh: {
5199 "Address": nh,
5200 "resolvedVia": "connected",
5201 "nexthops": {
5202 "nexthop1": {
5203 "Interface": intf
5204 }
5205 }
5206 }
5207 }
5208 }
5209 result = verify_ip_nht(tgen, input_dict_4)
5210 Returns
5211 -------
5212 errormsg(str) or True
5213 """
5214
5215 logger.debug("Entering lib API: verify_ip_nht()")
5216
5217 router_list = tgen.routers()
5218 for router in input_dict.keys():
5219 if router not in router_list:
5220 continue
5221
5222 rnode = router_list[router]
5223 nh_list = input_dict[router]
5224
5225 if validate_ip_address(next(iter(nh_list))) == "ipv6":
5226 show_ip_nht = run_frr_cmd(rnode, "show ipv6 nht")
5227 else:
5228 show_ip_nht = run_frr_cmd(rnode, "show ip nht")
5229
5230 for nh in nh_list:
5231 if nh in show_ip_nht:
5232 nht = run_frr_cmd(rnode, "show ip nht {}".format(nh))
5233 if "unresolved" in nht:
5234 errormsg = "Nexthop {} became unresolved on {}".format(nh, router)
5235 return errormsg
5236 else:
5237 logger.info("Nexthop %s is resolved on %s", nh, router)
5238 return True
5239 else:
5240 errormsg = "Nexthop {} is resolved on {}".format(nh, router)
5241 return errormsg
5242
5243 logger.debug("Exiting lib API: verify_ip_nht()")
5244 return False
5245
5246
5247 def scapy_send_raw_packet(tgen, topo, senderRouter, intf, packet=None):
5248 """
5249 Using scapy Raw() method to send BSR raw packet from one FRR
5250 to other
5251
5252 Parameters:
5253 -----------
5254 * `tgen` : Topogen object
5255 * `topo` : json file data
5256 * `senderRouter` : Sender router
5257 * `packet` : packet in raw format
5258
5259 returns:
5260 --------
5261 errormsg or True
5262 """
5263
5264 global CD
5265 result = ""
5266 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5267 sender_interface = intf
5268 rnode = tgen.routers()[senderRouter]
5269
5270 for destLink, data in topo["routers"][senderRouter]["links"].items():
5271 if "type" in data and data["type"] == "loopback":
5272 continue
5273
5274 if not packet:
5275 packet = topo["routers"][senderRouter]["pkt"]["test_packets"][packet][
5276 "data"
5277 ]
5278
5279 python3_path = tgen.net.get_exec_path(["python3", "python"])
5280 script_path = os.path.join(CD, "send_bsr_packet.py")
5281 cmd = "{} {} '{}' '{}' --interval=1 --count=1".format(
5282 python3_path, script_path, packet, sender_interface
5283 )
5284
5285 logger.info("Scapy cmd: \n %s", cmd)
5286 result = rnode.run(cmd)
5287
5288 if result == "":
5289 return result
5290
5291 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
5292 return True