]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
Merge pull request #11485 from AbhishekNR/ipv6_mld_todo
[mirror_frr.git] / tests / topotests / lib / common_config.py
1 #
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 import ipaddress
22 import json
23 import os
24 import platform
25 import socket
26 import subprocess
27 import sys
28 import traceback
29 import functools
30 from collections import OrderedDict
31 from copy import deepcopy
32 from datetime import datetime, timedelta
33 from functools import wraps
34 from re import search as re_search
35 from time import sleep
36
37 try:
38 # Imports from python2
39 import ConfigParser as configparser
40 except ImportError:
41 # Imports from python3
42 import configparser
43
44 from lib.micronet import comm_error
45 from lib.topogen import TopoRouter, get_topogen
46 from lib.topolog import get_logger, logger
47 from lib.topotest import frr_unicode, interface_set_status, version_cmp
48 from lib import topotest
49
50 FRRCFG_FILE = "frr_json.conf"
51 FRRCFG_BKUP_FILE = "frr_json_initial.conf"
52
53 ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
54
55 ####
56 CD = os.path.dirname(os.path.realpath(__file__))
57 PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
58
59 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
60 # in `pytest.ini`.
61 config = configparser.ConfigParser()
62 config.read(PYTESTINI_PATH)
63
64 config_section = "topogen"
65
66 # Debug logs for daemons
67 DEBUG_LOGS = {
68 "pimd": [
69 "debug msdp events",
70 "debug msdp packets",
71 "debug igmp events",
72 "debug igmp trace",
73 "debug mroute",
74 "debug mroute detail",
75 "debug pim events",
76 "debug pim packets",
77 "debug pim trace",
78 "debug pim zebra",
79 "debug pim bsm",
80 "debug pim packets joins",
81 "debug pim packets register",
82 "debug pim nht",
83 ],
84 "bgpd": [
85 "debug bgp neighbor-events",
86 "debug bgp updates",
87 "debug bgp zebra",
88 "debug bgp nht",
89 "debug bgp neighbor-events",
90 "debug bgp graceful-restart",
91 "debug bgp update-groups",
92 "debug bgp vpn leak-from-vrf",
93 "debug bgp vpn leak-to-vrf",
94 "debug bgp zebr",
95 "debug bgp updates",
96 "debug bgp nht",
97 "debug bgp neighbor-events",
98 "debug vrf",
99 ],
100 "zebra": [
101 "debug zebra events",
102 "debug zebra rib",
103 "debug zebra vxlan",
104 "debug zebra nht",
105 ],
106 "ospf": [
107 "debug ospf event",
108 "debug ospf ism",
109 "debug ospf lsa",
110 "debug ospf nsm",
111 "debug ospf nssa",
112 "debug ospf packet all",
113 "debug ospf sr",
114 "debug ospf te",
115 "debug ospf zebra",
116 ],
117 "ospf6": [
118 "debug ospf6 event",
119 "debug ospf6 ism",
120 "debug ospf6 lsa",
121 "debug ospf6 nsm",
122 "debug ospf6 nssa",
123 "debug ospf6 packet all",
124 "debug ospf6 sr",
125 "debug ospf6 te",
126 "debug ospf6 zebra",
127 ],
128 }
129
130 g_iperf_client_procs = {}
131 g_iperf_server_procs = {}
132
133
134 def is_string(value):
135 try:
136 return isinstance(value, basestring)
137 except NameError:
138 return isinstance(value, str)
139
140
141 if config.has_option("topogen", "verbosity"):
142 loglevel = config.get("topogen", "verbosity")
143 loglevel = loglevel.lower()
144 else:
145 loglevel = "info"
146
147 if config.has_option("topogen", "frrtest_log_dir"):
148 frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
149 time_stamp = datetime.time(datetime.now())
150 logfile_name = "frr_test_bgp_"
151 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
152 print("frrtest_log_file..", frrtest_log_file)
153
154 logger = get_logger(
155 "test_execution_logs", log_level=loglevel, target=frrtest_log_file
156 )
157 print("Logs will be sent to logfile: {}".format(frrtest_log_file))
158
159 if config.has_option("topogen", "show_router_config"):
160 show_router_config = config.get("topogen", "show_router_config")
161 else:
162 show_router_config = False
163
164 # env variable for setting what address type to test
165 ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
166
167
168 # Saves sequence id numbers
169 SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
170
171
172 def get_seq_id(obj_type, router, obj_name):
173 """
174 Generates and saves sequence number in interval of 10
175 Parameters
176 ----------
177 * `obj_type`: prefix_lists or route_maps
178 * `router`: router name
179 *` obj_name`: name of the prefix-list or route-map
180 Returns
181 --------
182 Sequence number generated
183 """
184
185 router_data = SEQ_ID[obj_type].setdefault(router, {})
186 obj_data = router_data.setdefault(obj_name, {})
187 seq_id = obj_data.setdefault("seq_id", 0)
188
189 seq_id = int(seq_id) + 10
190 obj_data["seq_id"] = seq_id
191
192 return seq_id
193
194
195 def set_seq_id(obj_type, router, id, obj_name):
196 """
197 Saves sequence number if not auto-generated and given by user
198 Parameters
199 ----------
200 * `obj_type`: prefix_lists or route_maps
201 * `router`: router name
202 *` obj_name`: name of the prefix-list or route-map
203 """
204 router_data = SEQ_ID[obj_type].setdefault(router, {})
205 obj_data = router_data.setdefault(obj_name, {})
206 seq_id = obj_data.setdefault("seq_id", 0)
207
208 seq_id = int(seq_id) + int(id)
209 obj_data["seq_id"] = seq_id
210
211
212 class InvalidCLIError(Exception):
213 """Raise when the CLI command is wrong"""
214
215
216 def run_frr_cmd(rnode, cmd, isjson=False):
217 """
218 Execute frr show commands in privileged mode
219 * `rnode`: router node on which command needs to be executed
220 * `cmd`: Command to be executed on frr
221 * `isjson`: If command is to get json data or not
222 :return str:
223 """
224
225 if cmd:
226 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
227
228 if isjson:
229 rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
230
231 return ret_data
232
233 else:
234 raise InvalidCLIError("No actual cmd passed")
235
236
237 def apply_raw_config(tgen, input_dict):
238
239 """
240 API to configure raw configuration on device. This can be used for any cli
241 which has not been implemented in JSON.
242
243 Parameters
244 ----------
245 * `tgen`: tgen object
246 * `input_dict`: configuration that needs to be applied
247
248 Usage
249 -----
250 input_dict = {
251 "r2": {
252 "raw_config": [
253 "router bgp",
254 "no bgp update-group-split-horizon"
255 ]
256 }
257 }
258 Returns
259 -------
260 True or errormsg
261 """
262
263 rlist = []
264
265 for router_name in input_dict.keys():
266 config_cmd = input_dict[router_name]["raw_config"]
267
268 if not isinstance(config_cmd, list):
269 config_cmd = [config_cmd]
270
271 frr_cfg_file = "{}/{}/{}".format(tgen.logdir, router_name, FRRCFG_FILE)
272 with open(frr_cfg_file, "w") as cfg:
273 for cmd in config_cmd:
274 cfg.write("{}\n".format(cmd))
275
276 rlist.append(router_name)
277
278 # Load config on all routers
279 return load_config_to_routers(tgen, rlist)
280
281
282 def create_common_configurations(
283 tgen, config_dict, config_type=None, build=False, load_config=True
284 ):
285 """
286 API to create object of class FRRConfig and also create frr_json.conf
287 file. It will create interface and common configurations and save it to
288 frr_json.conf and load to router
289 Parameters
290 ----------
291 * `tgen`: tgen object
292 * `config_dict`: Configuration data saved in a dict of { router: config-list }
293 * `routers` : list of router id to be configured.
294 * `config_type` : Syntactic information while writing configuration. Should
295 be one of the value as mentioned in the config_map below.
296 * `build` : Only for initial setup phase this is set as True
297 Returns
298 -------
299 True or False
300 """
301
302 config_map = OrderedDict(
303 {
304 "general_config": "! FRR General Config\n",
305 "debug_log_config": "! Debug log Config\n",
306 "interface_config": "! Interfaces Config\n",
307 "static_route": "! Static Route Config\n",
308 "prefix_list": "! Prefix List Config\n",
309 "bgp_community_list": "! Community List Config\n",
310 "route_maps": "! Route Maps Config\n",
311 "bgp": "! BGP Config\n",
312 "vrf": "! VRF Config\n",
313 "ospf": "! OSPF Config\n",
314 "ospf6": "! OSPF Config\n",
315 "pim": "! PIM Config\n",
316 }
317 )
318
319 if build:
320 mode = "a"
321 elif not load_config:
322 mode = "a"
323 else:
324 mode = "w"
325
326 routers = config_dict.keys()
327 for router in routers:
328 fname = "{}/{}/{}".format(tgen.logdir, router, FRRCFG_FILE)
329 try:
330 frr_cfg_fd = open(fname, mode)
331 if config_type:
332 frr_cfg_fd.write(config_map[config_type])
333 for line in config_dict[router]:
334 frr_cfg_fd.write("{} \n".format(str(line)))
335 frr_cfg_fd.write("\n")
336
337 except IOError as err:
338 logger.error("Unable to open FRR Config '%s': %s" % (fname, str(err)))
339 return False
340 finally:
341 frr_cfg_fd.close()
342
343 # If configuration applied from build, it will done at last
344 result = True
345 if not build and load_config:
346 result = load_config_to_routers(tgen, routers)
347
348 return result
349
350
351 def create_common_configuration(
352 tgen, router, data, config_type=None, build=False, load_config=True
353 ):
354 """
355 API to create object of class FRRConfig and also create frr_json.conf
356 file. It will create interface and common configurations and save it to
357 frr_json.conf and load to router
358 Parameters
359 ----------
360 * `tgen`: tgen object
361 * `data`: Configuration data saved in a list.
362 * `router` : router id to be configured.
363 * `config_type` : Syntactic information while writing configuration. Should
364 be one of the value as mentioned in the config_map below.
365 * `build` : Only for initial setup phase this is set as True
366 Returns
367 -------
368 True or False
369 """
370 return create_common_configurations(
371 tgen, {router: data}, config_type, build, load_config
372 )
373
374
375 def kill_router_daemons(tgen, router, daemons, save_config=True):
376 """
377 Router's current config would be saved to /etc/frr/ for each daemon
378 and daemon would be killed forcefully using SIGKILL.
379 * `tgen` : topogen object
380 * `router`: Device under test
381 * `daemons`: list of daemons to be killed
382 """
383
384 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
385
386 try:
387 router_list = tgen.routers()
388
389 if save_config:
390 # Saving router config to /etc/frr, which will be loaded to router
391 # when it starts
392 router_list[router].vtysh_cmd("write memory")
393
394 # Kill Daemons
395 result = router_list[router].killDaemons(daemons)
396 if len(result) > 0:
397 assert "Errors found post shutdown - details follow:" == 0, result
398 return result
399
400 except Exception as e:
401 errormsg = traceback.format_exc()
402 logger.error(errormsg)
403 return errormsg
404
405
406 def start_router_daemons(tgen, router, daemons):
407 """
408 Daemons defined by user would be started
409 * `tgen` : topogen object
410 * `router`: Device under test
411 * `daemons`: list of daemons to be killed
412 """
413
414 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
415
416 try:
417 router_list = tgen.routers()
418
419 # Start daemons
420 res = router_list[router].startDaemons(daemons)
421
422 except Exception as e:
423 errormsg = traceback.format_exc()
424 logger.error(errormsg)
425 res = errormsg
426
427 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
428 return res
429
430
431 def check_router_status(tgen):
432 """
433 Check if all daemons are running for all routers in topology
434 * `tgen` : topogen object
435 """
436
437 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
438
439 try:
440 router_list = tgen.routers()
441 for router, rnode in router_list.items():
442
443 result = rnode.check_router_running()
444 if result != "":
445 daemons = []
446 if "bgpd" in result:
447 daemons.append("bgpd")
448 if "zebra" in result:
449 daemons.append("zebra")
450 if "pimd" in result:
451 daemons.append("pimd")
452 if "pim6d" in result:
453 daemons.append("pim6d")
454 if "ospfd" in result:
455 daemons.append("ospfd")
456 if "ospf6d" in result:
457 daemons.append("ospf6d")
458 rnode.startDaemons(daemons)
459
460 except Exception as e:
461 errormsg = traceback.format_exc()
462 logger.error(errormsg)
463 return errormsg
464
465 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
466 return True
467
468
469 def save_initial_config_on_routers(tgen):
470 """Save current configuration on routers to FRRCFG_BKUP_FILE.
471
472 FRRCFG_BKUP_FILE is the file that will be restored when `reset_config_on_routers()`
473 is called.
474
475 Parameters
476 ----------
477 * `tgen` : Topogen object
478 """
479 router_list = tgen.routers()
480 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
481
482 # Get all running configs in parallel
483 procs = {}
484 for rname in router_list:
485 logger.info("Fetching running config for router %s", rname)
486 procs[rname] = router_list[rname].popen(
487 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
488 stdin=None,
489 stdout=open(target_cfg_fmt.format(rname), "w"),
490 stderr=subprocess.PIPE,
491 )
492 for rname, p in procs.items():
493 _, error = p.communicate()
494 if p.returncode:
495 logger.error(
496 "Get running config for %s failed %d: %s", rname, p.returncode, error
497 )
498 raise InvalidCLIError(
499 "vtysh show running error on {}: {}".format(rname, error)
500 )
501
502
503 def reset_config_on_routers(tgen, routerName=None):
504 """
505 Resets configuration on routers to the snapshot created using input JSON
506 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
507
508 Parameters
509 ----------
510 * `tgen` : Topogen object
511 * `routerName` : router config is to be reset
512 """
513
514 logger.debug("Entering API: reset_config_on_routers")
515
516 tgen.cfg_gen += 1
517 gen = tgen.cfg_gen
518
519 # Trim the router list if needed
520 router_list = tgen.routers()
521 if routerName:
522 if routerName not in router_list:
523 logger.warning(
524 "Exiting API: reset_config_on_routers: no router %s",
525 routerName,
526 exc_info=True,
527 )
528 return True
529 router_list = {routerName: router_list[routerName]}
530
531 delta_fmt = tgen.logdir + "/{}/delta-{}.conf"
532 # FRRCFG_BKUP_FILE
533 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
534 run_cfg_fmt = tgen.logdir + "/{}/frr-{}.sav"
535
536 #
537 # Get all running configs in parallel
538 #
539 procs = {}
540 for rname in router_list:
541 logger.info("Fetching running config for router %s", rname)
542 procs[rname] = router_list[rname].popen(
543 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
544 stdin=None,
545 stdout=open(run_cfg_fmt.format(rname, gen), "w"),
546 stderr=subprocess.PIPE,
547 )
548 for rname, p in procs.items():
549 _, error = p.communicate()
550 if p.returncode:
551 logger.error(
552 "Get running config for %s failed %d: %s", rname, p.returncode, error
553 )
554 raise InvalidCLIError(
555 "vtysh show running error on {}: {}".format(rname, error)
556 )
557
558 #
559 # Get all delta's in parallel
560 #
561 procs = {}
562 for rname in router_list:
563 logger.info(
564 "Generating delta for router %s to new configuration (gen %d)", rname, gen
565 )
566 procs[rname] = tgen.net.popen(
567 [
568 "/usr/lib/frr/frr-reload.py",
569 "--test-reset",
570 "--input",
571 run_cfg_fmt.format(rname, gen),
572 "--test",
573 target_cfg_fmt.format(rname),
574 ],
575 stdin=None,
576 stdout=open(delta_fmt.format(rname, gen), "w"),
577 stderr=subprocess.PIPE,
578 )
579 for rname, p in procs.items():
580 _, error = p.communicate()
581 if p.returncode:
582 logger.error(
583 "Delta file creation for %s failed %d: %s", rname, p.returncode, error
584 )
585 raise InvalidCLIError("frr-reload error for {}: {}".format(rname, error))
586
587 #
588 # Apply all the deltas in parallel
589 #
590 procs = {}
591 for rname in router_list:
592 logger.info("Applying delta config on router %s", rname)
593
594 procs[rname] = router_list[rname].popen(
595 ["/usr/bin/env", "vtysh", "-f", delta_fmt.format(rname, gen)],
596 stdin=None,
597 stdout=subprocess.PIPE,
598 stderr=subprocess.STDOUT,
599 )
600 for rname, p in procs.items():
601 output, _ = p.communicate()
602 vtysh_command = "vtysh -f {}".format(delta_fmt.format(rname, gen))
603 if not p.returncode:
604 router_list[rname].logger.info(
605 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
606 vtysh_command, output
607 )
608 )
609 else:
610 router_list[rname].logger.warning(
611 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
612 vtysh_command, output
613 )
614 )
615 logger.error(
616 "Delta file apply for %s failed %d: %s", rname, p.returncode, output
617 )
618
619 # We really need to enable this failure; however, currently frr-reload.py
620 # producing invalid "no" commands as it just preprends "no", but some of the
621 # command forms lack matching values (e.g., final values). Until frr-reload
622 # is fixed to handle this (or all the CLI no forms are adjusted) we can't
623 # fail tests.
624 # raise InvalidCLIError("frr-reload error for {}: {}".format(rname, output))
625
626 #
627 # Optionally log all new running config if "show_router_config" is defined in
628 # "pytest.ini"
629 #
630 if show_router_config:
631 procs = {}
632 for rname in router_list:
633 logger.info("Fetching running config for router %s", rname)
634 procs[rname] = router_list[rname].popen(
635 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
636 stdin=None,
637 stdout=subprocess.PIPE,
638 stderr=subprocess.STDOUT,
639 )
640 for rname, p in procs.items():
641 output, _ = p.communicate()
642 if p.returncode:
643 logger.warning(
644 "Get running config for %s failed %d: %s",
645 rname,
646 p.returncode,
647 output,
648 )
649 else:
650 logger.info(
651 "Configuration on router %s after reset:\n%s", rname, output
652 )
653
654 logger.debug("Exiting API: reset_config_on_routers")
655 return True
656
657
658 def prep_load_config_to_routers(tgen, *config_name_list):
659 """Create common config for `load_config_to_routers`.
660
661 The common config file is constructed from the list of sub-config files passed as
662 position arguments to this function. Each entry in `config_name_list` is looked for
663 under the router sub-directory in the test directory and those files are
664 concatenated together to create the common config. e.g.,
665
666 # Routers are "r1" and "r2", test file is `example/test_example_foo.py`
667 prepare_load_config_to_routers(tgen, "bgpd.conf", "ospfd.conf")
668
669 When the above call is made the files in
670
671 example/r1/bgpd.conf
672 example/r1/ospfd.conf
673
674 Are concat'd together into a single config file that will be loaded on r1, and
675
676 example/r2/bgpd.conf
677 example/r2/ospfd.conf
678
679 Are concat'd together into a single config file that will be loaded on r2 when
680 the call to `load_config_to_routers` is made.
681 """
682
683 routers = tgen.routers()
684 for rname, router in routers.items():
685 destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_FILE)
686 wmode = "w"
687 for cfbase in config_name_list:
688 script_dir = os.environ["PYTEST_TOPOTEST_SCRIPTDIR"]
689 confname = os.path.join(script_dir, "{}/{}".format(rname, cfbase))
690 with open(confname, "r") as cf:
691 with open(destname, wmode) as df:
692 df.write(cf.read())
693 wmode = "a"
694
695
696 def load_config_to_routers(tgen, routers, save_bkup=False):
697 """
698 Loads configuration on routers from the file FRRCFG_FILE.
699
700 Parameters
701 ----------
702 * `tgen` : Topogen object
703 * `routers` : routers for which configuration is to be loaded
704 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
705 Returns
706 -------
707 True or False
708 """
709
710 logger.debug("Entering API: load_config_to_routers")
711
712 tgen.cfg_gen += 1
713 gen = tgen.cfg_gen
714
715 base_router_list = tgen.routers()
716 router_list = {}
717 for router in routers:
718 if router not in base_router_list:
719 continue
720 router_list[router] = base_router_list[router]
721
722 frr_cfg_file_fmt = tgen.logdir + "/{}/" + FRRCFG_FILE
723 frr_cfg_save_file_fmt = tgen.logdir + "/{}/{}-" + FRRCFG_FILE
724 frr_cfg_bkup_fmt = tgen.logdir + "/{}/" + FRRCFG_BKUP_FILE
725
726 procs = {}
727 for rname in router_list:
728 router = router_list[rname]
729 try:
730 frr_cfg_file = frr_cfg_file_fmt.format(rname)
731 frr_cfg_save_file = frr_cfg_save_file_fmt.format(rname, gen)
732 frr_cfg_bkup = frr_cfg_bkup_fmt.format(rname)
733 with open(frr_cfg_file, "r+") as cfg:
734 data = cfg.read()
735 logger.info(
736 "Applying following configuration on router %s (gen: %d):\n%s",
737 rname,
738 gen,
739 data,
740 )
741 # Always save a copy of what we just did
742 with open(frr_cfg_save_file, "w") as bkup:
743 bkup.write(data)
744 if save_bkup:
745 with open(frr_cfg_bkup, "w") as bkup:
746 bkup.write(data)
747 procs[rname] = router_list[rname].popen(
748 ["/usr/bin/env", "vtysh", "-f", frr_cfg_file],
749 stdin=None,
750 stdout=subprocess.PIPE,
751 stderr=subprocess.STDOUT,
752 )
753 except IOError as err:
754 logger.error(
755 "Unable to open config File. error(%s): %s", err.errno, err.strerror
756 )
757 return False
758 except Exception as error:
759 logger.error("Unable to apply config on %s: %s", rname, str(error))
760 return False
761
762 errors = []
763 for rname, p in procs.items():
764 output, _ = p.communicate()
765 frr_cfg_file = frr_cfg_file_fmt.format(rname)
766 vtysh_command = "vtysh -f " + frr_cfg_file
767 if not p.returncode:
768 router_list[rname].logger.info(
769 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
770 vtysh_command, output
771 )
772 )
773 else:
774 router_list[rname].logger.error(
775 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
776 vtysh_command, output
777 )
778 )
779 logger.error(
780 "Config apply for %s failed %d: %s", rname, p.returncode, output
781 )
782 # We can't thorw an exception here as we won't clear the config file.
783 errors.append(
784 InvalidCLIError(
785 "load_config_to_routers error for {}: {}".format(rname, output)
786 )
787 )
788
789 # Empty the config file or we append to it next time through.
790 with open(frr_cfg_file, "r+") as cfg:
791 cfg.truncate(0)
792
793 # Router current configuration to log file or console if
794 # "show_router_config" is defined in "pytest.ini"
795 if show_router_config:
796 procs = {}
797 for rname in router_list:
798 procs[rname] = router_list[rname].popen(
799 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
800 stdin=None,
801 stdout=subprocess.PIPE,
802 stderr=subprocess.STDOUT,
803 )
804 for rname, p in procs.items():
805 output, _ = p.communicate()
806 if p.returncode:
807 logger.warning(
808 "Get running config for %s failed %d: %s",
809 rname,
810 p.returncode,
811 output,
812 )
813 else:
814 logger.info("New configuration for router %s:\n%s", rname, output)
815
816 logger.debug("Exiting API: load_config_to_routers")
817 return not errors
818
819
820 def load_config_to_router(tgen, routerName, save_bkup=False):
821 """
822 Loads configuration on router from the file FRRCFG_FILE.
823
824 Parameters
825 ----------
826 * `tgen` : Topogen object
827 * `routerName` : router for which configuration to be loaded
828 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
829 """
830 return load_config_to_routers(tgen, [routerName], save_bkup)
831
832
833 def reset_with_new_configs(tgen, *cflist):
834 """Reset the router to initial config, then load new configs.
835
836 Resets routers to the initial config state (see `save_initial_config_on_routers()
837 and `reset_config_on_routers()` `), then concat list of router sub-configs together
838 and load onto the routers (see `prep_load_config_to_routers()` and
839 `load_config_to_routers()`)
840 """
841 routers = tgen.routers()
842
843 reset_config_on_routers(tgen)
844 prep_load_config_to_routers(tgen, *cflist)
845 load_config_to_routers(tgen, tgen.routers(), save_bkup=False)
846
847
848 def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
849 """
850 API to get the link local ipv6 address of a particular interface using
851 FRR command 'show interface'
852
853 * `tgen`: tgen object
854 * `router` : router for which highest interface should be
855 calculated
856 * `intf` : interface for which link-local address needs to be taken
857 * `vrf` : VRF name
858
859 Usage
860 -----
861 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
862
863 Returns
864 -------
865 1) array of interface names to link local ips.
866 """
867
868 router_list = tgen.routers()
869 for rname, rnode in router_list.items():
870 if rname != router:
871 continue
872
873 linklocal = []
874
875 if vrf:
876 cmd = "show interface vrf {}".format(vrf)
877 else:
878 cmd = "show interface"
879
880 linklocal = []
881 if vrf:
882 cmd = "show interface vrf {}".format(vrf)
883 else:
884 cmd = "show interface"
885 for chk_ll in range(0, 60):
886 sleep(1 / 4)
887 ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
888 # Fix newlines (make them all the same)
889 ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines()
890
891 interface = None
892 ll_per_if_count = 0
893 for line in ifaces:
894 # Interface name
895 m = re_search("Interface ([a-zA-Z0-9-]+) is", line)
896 if m:
897 interface = m.group(1).split(" ")[0]
898 ll_per_if_count = 0
899
900 # Interface ip
901 m1 = re_search("inet6 (fe80[:a-fA-F0-9]+/[0-9]+)", line)
902 if m1:
903 local = m1.group(1)
904 ll_per_if_count += 1
905 if ll_per_if_count > 1:
906 linklocal += [["%s-%s" % (interface, ll_per_if_count), local]]
907 else:
908 linklocal += [[interface, local]]
909
910 try:
911 if linklocal:
912 if intf:
913 return [
914 _linklocal[1]
915 for _linklocal in linklocal
916 if _linklocal[0] == intf
917 ][0].split("/")[0]
918 return linklocal
919 except IndexError:
920 continue
921
922 errormsg = "Link local ip missing on router {}".format(router)
923 return errormsg
924
925
926 def generate_support_bundle():
927 """
928 API to generate support bundle on any verification ste failure.
929 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
930 which basically runs defined CLIs and dumps the data to specified location
931 """
932
933 tgen = get_topogen()
934 router_list = tgen.routers()
935 test_name = os.environ.get("PYTEST_CURRENT_TEST").split(":")[-1].split(" ")[0]
936
937 bundle_procs = {}
938 for rname, rnode in router_list.items():
939 logger.info("Spawn collection of support bundle for %s", rname)
940 dst_bundle = "{}/{}/support_bundles/{}".format(tgen.logdir, rname, test_name)
941 rnode.run("mkdir -p " + dst_bundle)
942
943 gen_sup_cmd = [
944 "/usr/lib/frr/generate_support_bundle.py",
945 "--log-dir=" + dst_bundle,
946 ]
947 bundle_procs[rname] = tgen.net[rname].popen(gen_sup_cmd, stdin=None)
948
949 for rname, rnode in router_list.items():
950 logger.info("Waiting on support bundle for %s", rname)
951 output, error = bundle_procs[rname].communicate()
952 if output:
953 logger.info(
954 "Output from collecting support bundle for %s:\n%s", rname, output
955 )
956 if error:
957 logger.warning(
958 "Error from collecting support bundle for %s:\n%s", rname, error
959 )
960
961 return True
962
963
964 def start_topology(tgen, daemon=None):
965 """
966 Starting topology, create tmp files which are loaded to routers
967 to start daemons and then start routers
968 * `tgen` : topogen object
969 """
970
971 # Starting topology
972 tgen.start_topology()
973
974 # Starting daemons
975
976 router_list = tgen.routers()
977 routers_sorted = sorted(
978 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
979 )
980
981 linux_ver = ""
982 router_list = tgen.routers()
983 for rname in routers_sorted:
984 router = router_list[rname]
985
986 # It will help in debugging the failures, will give more details on which
987 # specific kernel version tests are failing
988 if linux_ver == "":
989 linux_ver = router.run("uname -a")
990 logger.info("Logging platform related details: \n %s \n", linux_ver)
991
992 try:
993 os.chdir(tgen.logdir)
994
995 # # Creating router named dir and empty zebra.conf bgpd.conf files
996 # # inside the current directory
997 # if os.path.isdir("{}".format(rname)):
998 # os.system("rm -rf {}".format(rname))
999 # os.mkdir("{}".format(rname))
1000 # os.system("chmod -R go+rw {}".format(rname))
1001 # os.chdir("{}/{}".format(tgen.logdir, rname))
1002 # os.system("touch zebra.conf bgpd.conf")
1003 # else:
1004 # os.mkdir("{}".format(rname))
1005 # os.system("chmod -R go+rw {}".format(rname))
1006 # os.chdir("{}/{}".format(tgen.logdir, rname))
1007 # os.system("touch zebra.conf bgpd.conf")
1008
1009 except IOError as err:
1010 logger.error("I/O error({0}): {1}".format(err.errno, err.strerror))
1011
1012 # Loading empty zebra.conf file to router, to start the zebra daemon
1013 router.load_config(
1014 TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(tgen.logdir, rname)
1015 )
1016
1017 # Loading empty bgpd.conf file to router, to start the bgp daemon
1018 router.load_config(
1019 TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(tgen.logdir, rname)
1020 )
1021
1022 if daemon and "ospfd" in daemon:
1023 # Loading empty ospf.conf file to router, to start the bgp daemon
1024 router.load_config(
1025 TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(tgen.logdir, rname)
1026 )
1027
1028 if daemon and "ospf6d" in daemon:
1029 # Loading empty ospf.conf file to router, to start the bgp daemon
1030 router.load_config(
1031 TopoRouter.RD_OSPF6, "{}/{}/ospf6d.conf".format(tgen.logdir, rname)
1032 )
1033
1034 if daemon and "pimd" in daemon:
1035 # Loading empty pimd.conf file to router, to start the pim deamon
1036 router.load_config(
1037 TopoRouter.RD_PIM, "{}/{}/pimd.conf".format(tgen.logdir, rname)
1038 )
1039
1040 if daemon and "pim6d" in daemon:
1041 # Loading empty pimd.conf file to router, to start the pim6d deamon
1042 router.load_config(
1043 TopoRouter.RD_PIM6, "{}/{}/pim6d.conf".format(tgen.logdir, rname)
1044 )
1045
1046 # Starting routers
1047 logger.info("Starting all routers once topology is created")
1048 tgen.start_router()
1049
1050
1051 def stop_router(tgen, router):
1052 """
1053 Router"s current config would be saved to /tmp/topotest/<suite>/<router> for each daemon
1054 and router and its daemons would be stopped.
1055
1056 * `tgen` : topogen object
1057 * `router`: Device under test
1058 """
1059
1060 router_list = tgen.routers()
1061
1062 # Saving router config to /etc/frr, which will be loaded to router
1063 # when it starts
1064 router_list[router].vtysh_cmd("write memory")
1065
1066 # Stop router
1067 router_list[router].stop()
1068
1069
1070 def start_router(tgen, router):
1071 """
1072 Router will be started and config would be loaded from /tmp/topotest/<suite>/<router> for each
1073 daemon
1074
1075 * `tgen` : topogen object
1076 * `router`: Device under test
1077 """
1078
1079 logger.debug("Entering lib API: start_router")
1080
1081 try:
1082 router_list = tgen.routers()
1083
1084 # Router and its daemons would be started and config would
1085 # be loaded to router for each daemon from /etc/frr
1086 router_list[router].start()
1087
1088 # Waiting for router to come up
1089 sleep(5)
1090
1091 except Exception as e:
1092 errormsg = traceback.format_exc()
1093 logger.error(errormsg)
1094 return errormsg
1095
1096 logger.debug("Exiting lib API: start_router()")
1097 return True
1098
1099
1100 def number_to_row(routerName):
1101 """
1102 Returns the number for the router.
1103 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
1104 etc
1105 """
1106 return int(routerName[1:])
1107
1108
1109 def number_to_column(routerName):
1110 """
1111 Returns the number for the router.
1112 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
1113 z23 = column 26 etc
1114 """
1115 return ord(routerName[0]) - 97
1116
1117
1118 def topo_daemons(tgen, topo=None):
1119 """
1120 Returns daemon list required for the suite based on topojson.
1121 """
1122 daemon_list = []
1123
1124 if topo is None:
1125 topo = tgen.json_topo
1126
1127 router_list = tgen.routers()
1128 routers_sorted = sorted(
1129 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
1130 )
1131
1132 for rtr in routers_sorted:
1133 if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list:
1134 daemon_list.append("ospfd")
1135
1136 if "ospf6" in topo["routers"][rtr] and "ospf6d" not in daemon_list:
1137 daemon_list.append("ospf6d")
1138
1139 for val in topo["routers"][rtr]["links"].values():
1140 if "pim" in val and "pimd" not in daemon_list:
1141 daemon_list.append("pimd")
1142 if "pim6" in val and "pim6d" not in daemon_list:
1143 daemon_list.append("pim6d")
1144 if "ospf" in val and "ospfd" not in daemon_list:
1145 daemon_list.append("ospfd")
1146 if "ospf6" in val and "ospf6d" not in daemon_list:
1147 daemon_list.append("ospf6d")
1148 break
1149
1150 return daemon_list
1151
1152
1153 def add_interfaces_to_vlan(tgen, input_dict):
1154 """
1155 Add interfaces to VLAN, we need vlan pakcage to be installed on machine
1156
1157 * `tgen`: tgen onject
1158 * `input_dict` : interfaces to be added to vlans
1159
1160 input_dict= {
1161 "r1":{
1162 "vlan":{
1163 VLAN_1: [{
1164 intf_r1_s1: {
1165 "ip": "10.1.1.1",
1166 "subnet": "255.255.255.0
1167 }
1168 }]
1169 }
1170 }
1171 }
1172
1173 add_interfaces_to_vlan(tgen, input_dict)
1174
1175 """
1176
1177 router_list = tgen.routers()
1178 for dut in input_dict.keys():
1179 rnode = router_list[dut]
1180
1181 if "vlan" in input_dict[dut]:
1182 for vlan, interfaces in input_dict[dut]["vlan"].items():
1183 for intf_dict in interfaces:
1184 for interface, data in intf_dict.items():
1185 # Adding interface to VLAN
1186 vlan_intf = "{}.{}".format(interface, vlan)
1187 cmd = "ip link add link {} name {} type vlan id {}".format(
1188 interface, vlan_intf, vlan
1189 )
1190 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1191 result = rnode.run(cmd)
1192 logger.info("result %s", result)
1193
1194 # Bringing interface up
1195 cmd = "ip link set {} up".format(vlan_intf)
1196 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1197 result = rnode.run(cmd)
1198 logger.info("result %s", result)
1199
1200 # Assigning IP address
1201 ifaddr = ipaddress.ip_interface(
1202 "{}/{}".format(
1203 frr_unicode(data["ip"]), frr_unicode(data["subnet"])
1204 )
1205 )
1206
1207 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1208 ifaddr.version, vlan_intf, ifaddr
1209 )
1210 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1211 result = rnode.run(cmd)
1212 logger.info("result %s", result)
1213
1214
1215 def tcpdump_capture_start(
1216 tgen,
1217 router,
1218 intf,
1219 protocol=None,
1220 grepstr=None,
1221 timeout=0,
1222 options=None,
1223 cap_file=None,
1224 background=True,
1225 ):
1226 """
1227 API to capture network packets using tcp dump.
1228
1229 Packages used :
1230
1231 Parameters
1232 ----------
1233 * `tgen`: topogen object.
1234 * `router`: router on which ping has to be performed.
1235 * `intf` : interface for capture.
1236 * `protocol` : protocol for which packet needs to be captured.
1237 * `grepstr` : string to filter out tcp dump output.
1238 * `timeout` : Time for which packet needs to be captured.
1239 * `options` : options for TCP dump, all tcpdump options can be used.
1240 * `cap_file` : filename to store capture dump.
1241 * `background` : Make tcp dump run in back ground.
1242
1243 Usage
1244 -----
1245 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1246 options='-A -vv -x > r2bgp.txt ')
1247 Returns
1248 -------
1249 1) True for successful capture
1250 2) errormsg - when tcp dump fails
1251 """
1252
1253 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1254
1255 rnode = tgen.gears[router]
1256
1257 if timeout > 0:
1258 cmd = "timeout {}".format(timeout)
1259 else:
1260 cmd = ""
1261
1262 cmdargs = "{} tcpdump".format(cmd)
1263
1264 if intf:
1265 cmdargs += " -i {}".format(str(intf))
1266 if protocol:
1267 cmdargs += " {}".format(str(protocol))
1268 if options:
1269 cmdargs += " -s 0 {}".format(str(options))
1270
1271 if cap_file:
1272 file_name = os.path.join(tgen.logdir, router, cap_file)
1273 cmdargs += " -w {}".format(str(file_name))
1274 # Remove existing capture file
1275 rnode.run("rm -rf {}".format(file_name))
1276
1277 if grepstr:
1278 cmdargs += ' | grep "{}"'.format(str(grepstr))
1279
1280 logger.info("Running tcpdump command: [%s]", cmdargs)
1281 if not background:
1282 rnode.run(cmdargs)
1283 else:
1284 # XXX this & is bogus doesn't work
1285 # rnode.run("nohup {} & /dev/null 2>&1".format(cmdargs))
1286 rnode.run("nohup {} > /dev/null 2>&1".format(cmdargs))
1287
1288 # Check if tcpdump process is running
1289 if background:
1290 result = rnode.run("pgrep tcpdump")
1291 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1292
1293 if not result:
1294 errormsg = "tcpdump is not running {}".format("tcpdump")
1295 return errormsg
1296 else:
1297 logger.info("Packet capture started on %s: interface %s", router, intf)
1298
1299 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1300 return True
1301
1302
1303 def tcpdump_capture_stop(tgen, router):
1304 """
1305 API to capture network packets using tcp dump.
1306
1307 Packages used :
1308
1309 Parameters
1310 ----------
1311 * `tgen`: topogen object.
1312 * `router`: router on which ping has to be performed.
1313 * `intf` : interface for capture.
1314 * `protocol` : protocol for which packet needs to be captured.
1315 * `grepstr` : string to filter out tcp dump output.
1316 * `timeout` : Time for which packet needs to be captured.
1317 * `options` : options for TCP dump, all tcpdump options can be used.
1318 * `cap2file` : filename to store capture dump.
1319 * `bakgrnd` : Make tcp dump run in back ground.
1320
1321 Usage
1322 -----
1323 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1324 options='-A -vv -x > r2bgp.txt ')
1325 Returns
1326 -------
1327 1) True for successful capture
1328 2) errormsg - when tcp dump fails
1329 """
1330
1331 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1332
1333 rnode = tgen.gears[router]
1334
1335 # Check if tcpdump process is running
1336 result = rnode.run("ps -ef | grep tcpdump")
1337 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1338
1339 if not re_search(r"{}".format("tcpdump"), result):
1340 errormsg = "tcpdump is not running {}".format("tcpdump")
1341 return errormsg
1342 else:
1343 # XXX this doesn't work with micronet
1344 ppid = tgen.net.nameToNode[rnode.name].pid
1345 rnode.run("set +m; pkill -P %s tcpdump &> /dev/null" % ppid)
1346 logger.info("Stopped tcpdump capture")
1347
1348 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1349 return True
1350
1351
1352 def create_debug_log_config(tgen, input_dict, build=False):
1353 """
1354 Enable/disable debug logs for any protocol with defined debug
1355 options and logs would be saved to created log file
1356
1357 Parameters
1358 ----------
1359 * `tgen` : Topogen object
1360 * `input_dict` : details to enable debug logs for protocols
1361 * `build` : Only for initial setup phase this is set as True.
1362
1363
1364 Usage:
1365 ------
1366 input_dict = {
1367 "r2": {
1368 "debug":{
1369 "log_file" : "debug.log",
1370 "enable": ["pimd", "zebra"],
1371 "disable": {
1372 "bgpd":[
1373 'debug bgp neighbor-events',
1374 'debug bgp updates',
1375 'debug bgp zebra',
1376 ]
1377 }
1378 }
1379 }
1380 }
1381
1382 result = create_debug_log_config(tgen, input_dict)
1383
1384 Returns
1385 -------
1386 True or False
1387 """
1388
1389 result = False
1390 try:
1391 debug_config_dict = {}
1392
1393 for router in input_dict.keys():
1394 debug_config = []
1395 if "debug" in input_dict[router]:
1396 debug_dict = input_dict[router]["debug"]
1397
1398 disable_logs = debug_dict.setdefault("disable", None)
1399 enable_logs = debug_dict.setdefault("enable", None)
1400 log_file = debug_dict.setdefault("log_file", None)
1401
1402 if log_file:
1403 _log_file = os.path.join(tgen.logdir, log_file)
1404 debug_config.append("log file {} \n".format(_log_file))
1405
1406 if type(enable_logs) is list:
1407 for daemon in enable_logs:
1408 for debug_log in DEBUG_LOGS[daemon]:
1409 debug_config.append("{}".format(debug_log))
1410 elif type(enable_logs) is dict:
1411 for daemon, debug_logs in enable_logs.items():
1412 for debug_log in debug_logs:
1413 debug_config.append("{}".format(debug_log))
1414
1415 if type(disable_logs) is list:
1416 for daemon in disable_logs:
1417 for debug_log in DEBUG_LOGS[daemon]:
1418 debug_config.append("no {}".format(debug_log))
1419 elif type(disable_logs) is dict:
1420 for daemon, debug_logs in disable_logs.items():
1421 for debug_log in debug_logs:
1422 debug_config.append("no {}".format(debug_log))
1423 if debug_config:
1424 debug_config_dict[router] = debug_config
1425
1426 result = create_common_configurations(
1427 tgen, debug_config_dict, "debug_log_config", build=build
1428 )
1429 except InvalidCLIError:
1430 # Traceback
1431 errormsg = traceback.format_exc()
1432 logger.error(errormsg)
1433 return errormsg
1434
1435 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1436 return result
1437
1438
1439 #############################################
1440 # Common APIs, will be used by all protocols
1441 #############################################
1442
1443
1444 def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
1445 """
1446 Create vrf configuration for created topology. VRF
1447 configuration is provided in input json file.
1448
1449 VRF config is done in Linux Kernel:
1450 * Create VRF
1451 * Attach interface to VRF
1452 * Bring up VRF
1453
1454 Parameters
1455 ----------
1456 * `tgen` : Topogen object
1457 * `topo` : json file data
1458 * `input_dict` : Input dict data, required when configuring
1459 from testcase
1460 * `build` : Only for initial setup phase this is set as True.
1461
1462 Usage
1463 -----
1464 input_dict={
1465 "r3": {
1466 "links": {
1467 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
1468 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
1469 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
1470 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
1471 },
1472 "vrfs":[
1473 {
1474 "name": "RED_A",
1475 "id": "1"
1476 },
1477 {
1478 "name": "RED_B",
1479 "id": "2"
1480 },
1481 {
1482 "name": "BLUE_A",
1483 "id": "3",
1484 "delete": True
1485 },
1486 {
1487 "name": "BLUE_B",
1488 "id": "4"
1489 }
1490 ]
1491 }
1492 }
1493 result = create_vrf_cfg(tgen, topo, input_dict)
1494
1495 Returns
1496 -------
1497 True or False
1498 """
1499 result = True
1500 if not input_dict:
1501 input_dict = deepcopy(topo)
1502 else:
1503 input_dict = deepcopy(input_dict)
1504
1505 try:
1506 config_data_dict = {}
1507
1508 for c_router, c_data in input_dict.items():
1509 rnode = tgen.gears[c_router]
1510 config_data = []
1511 if "vrfs" in c_data:
1512 for vrf in c_data["vrfs"]:
1513 name = vrf.setdefault("name", None)
1514 table_id = vrf.setdefault("id", None)
1515 del_action = vrf.setdefault("delete", False)
1516
1517 if del_action:
1518 # Kernel cmd- Add VRF and table
1519 cmd = "ip link del {} type vrf table {}".format(
1520 vrf["name"], vrf["id"]
1521 )
1522
1523 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
1524 rnode.run(cmd)
1525
1526 # Kernel cmd - Bring down VRF
1527 cmd = "ip link set dev {} down".format(name)
1528 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
1529 rnode.run(cmd)
1530
1531 else:
1532 if name and table_id:
1533 # Kernel cmd- Add VRF and table
1534 cmd = "ip link add {} type vrf table {}".format(
1535 name, table_id
1536 )
1537 logger.info(
1538 "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
1539 )
1540 rnode.run(cmd)
1541
1542 # Kernel cmd - Bring up VRF
1543 cmd = "ip link set dev {} up".format(name)
1544 logger.info(
1545 "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
1546 )
1547 rnode.run(cmd)
1548
1549 for vrf in c_data["vrfs"]:
1550 vni = vrf.setdefault("vni", None)
1551 del_vni = vrf.setdefault("no_vni", None)
1552
1553 if "links" in c_data:
1554 for destRouterLink, data in sorted(c_data["links"].items()):
1555 # Loopback interfaces
1556 if "type" in data and data["type"] == "loopback":
1557 interface_name = destRouterLink
1558 else:
1559 interface_name = data["interface"]
1560
1561 if "vrf" in data:
1562 vrf_list = data["vrf"]
1563
1564 if type(vrf_list) is not list:
1565 vrf_list = [vrf_list]
1566
1567 for _vrf in vrf_list:
1568 cmd = "ip link set {} master {}".format(
1569 interface_name, _vrf
1570 )
1571
1572 logger.info(
1573 "[DUT: %s]: Running" " kernel cmd [%s]",
1574 c_router,
1575 cmd,
1576 )
1577 rnode.run(cmd)
1578
1579 if vni:
1580 config_data.append("vrf {}".format(vrf["name"]))
1581 cmd = "vni {}".format(vni)
1582 config_data.append(cmd)
1583
1584 if del_vni:
1585 config_data.append("vrf {}".format(vrf["name"]))
1586 cmd = "no vni {}".format(del_vni)
1587 config_data.append(cmd)
1588
1589 if config_data:
1590 config_data_dict[c_router] = config_data
1591
1592 result = create_common_configurations(
1593 tgen, config_data_dict, "vrf", build=build
1594 )
1595
1596 except InvalidCLIError:
1597 # Traceback
1598 errormsg = traceback.format_exc()
1599 logger.error(errormsg)
1600 return errormsg
1601
1602 return result
1603
1604
1605 def create_interface_in_kernel(
1606 tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
1607 ):
1608 """
1609 Cretae interfaces in kernel for ipv4/ipv6
1610 Config is done in Linux Kernel:
1611
1612 Parameters
1613 ----------
1614 * `tgen` : Topogen object
1615 * `dut` : Device for which interfaces to be added
1616 * `name` : interface name
1617 * `ip_addr` : ip address for interface
1618 * `vrf` : VRF name, to which interface will be associated
1619 * `netmask` : netmask value, default is None
1620 * `create`: Create interface in kernel, if created then no need
1621 to create
1622 """
1623
1624 rnode = tgen.gears[dut]
1625
1626 if create:
1627 cmd = "ip link show {0} >/dev/null || ip link add {0} type dummy".format(name)
1628 rnode.run(cmd)
1629
1630 if not netmask:
1631 ifaddr = ipaddress.ip_interface(frr_unicode(ip_addr))
1632 else:
1633 ifaddr = ipaddress.ip_interface(
1634 "{}/{}".format(frr_unicode(ip_addr), frr_unicode(netmask))
1635 )
1636 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1637 ifaddr.version, name, ifaddr
1638 )
1639 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1640 rnode.run(cmd)
1641
1642 if vrf:
1643 cmd = "ip link set {} master {}".format(name, vrf)
1644 rnode.run(cmd)
1645
1646
1647 def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False):
1648 """
1649 Cretae interfaces in kernel for ipv4/ipv6
1650 Config is done in Linux Kernel:
1651
1652 Parameters
1653 ----------
1654 * `tgen` : Topogen object
1655 * `dut` : Device for which interfaces to be added
1656 * `intf_name` : interface name
1657 * `ifaceaction` : False to shutdown and True to bringup the
1658 ineterface
1659 """
1660
1661 rnode = tgen.gears[dut]
1662
1663 cmd = "ip link set dev"
1664 if ifaceaction:
1665 action = "up"
1666 cmd = "{} {} {}".format(cmd, intf_name, action)
1667 else:
1668 action = "down"
1669 cmd = "{} {} {}".format(cmd, intf_name, action)
1670
1671 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1672 rnode.run(cmd)
1673
1674
1675 def validate_ip_address(ip_address):
1676 """
1677 Validates the type of ip address
1678 Parameters
1679 ----------
1680 * `ip_address`: IPv4/IPv6 address
1681 Returns
1682 -------
1683 Type of address as string
1684 """
1685
1686 if "/" in ip_address:
1687 ip_address = ip_address.split("/")[0]
1688
1689 v4 = True
1690 v6 = True
1691 try:
1692 socket.inet_aton(ip_address)
1693 except socket.error as error:
1694 logger.debug("Not a valid IPv4 address")
1695 v4 = False
1696 else:
1697 return "ipv4"
1698
1699 try:
1700 socket.inet_pton(socket.AF_INET6, ip_address)
1701 except socket.error as error:
1702 logger.debug("Not a valid IPv6 address")
1703 v6 = False
1704 else:
1705 return "ipv6"
1706
1707 if not v4 and not v6:
1708 raise Exception(
1709 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1710 )
1711
1712
1713 def check_address_types(addr_type=None):
1714 """
1715 Checks environment variable set and compares with the current address type
1716 """
1717
1718 addr_types_env = os.environ.get("ADDRESS_TYPES")
1719 if not addr_types_env:
1720 addr_types_env = "dual"
1721
1722 if addr_types_env == "dual":
1723 addr_types = ["ipv4", "ipv6"]
1724 elif addr_types_env == "ipv4":
1725 addr_types = ["ipv4"]
1726 elif addr_types_env == "ipv6":
1727 addr_types = ["ipv6"]
1728
1729 if addr_type is None:
1730 return addr_types
1731
1732 if addr_type not in addr_types:
1733 logger.debug(
1734 "{} not in supported/configured address types {}".format(
1735 addr_type, addr_types
1736 )
1737 )
1738 return False
1739
1740 return True
1741
1742
1743 def generate_ips(network, no_of_ips):
1744 """
1745 Returns list of IPs.
1746 based on start_ip and no_of_ips
1747
1748 * `network` : from here the ip will start generating,
1749 start_ip will be
1750 * `no_of_ips` : these many IPs will be generated
1751 """
1752 ipaddress_list = []
1753 if type(network) is not list:
1754 network = [network]
1755
1756 for start_ipaddr in network:
1757 if "/" in start_ipaddr:
1758 start_ip = start_ipaddr.split("/")[0]
1759 mask = int(start_ipaddr.split("/")[1])
1760 else:
1761 logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr))
1762 assert 0
1763
1764 addr_type = validate_ip_address(start_ip)
1765 if addr_type == "ipv4":
1766 if start_ip == "0.0.0.0" and mask == 0 and no_of_ips == 1:
1767 ipaddress_list.append("{}/{}".format(start_ip, mask))
1768 return ipaddress_list
1769 start_ip = ipaddress.IPv4Address(frr_unicode(start_ip))
1770 step = 2 ** (32 - mask)
1771 elif addr_type == "ipv6":
1772 if start_ip == "0::0" and mask == 0 and no_of_ips == 1:
1773 ipaddress_list.append("{}/{}".format(start_ip, mask))
1774 return ipaddress_list
1775 start_ip = ipaddress.IPv6Address(frr_unicode(start_ip))
1776 step = 2 ** (128 - mask)
1777 else:
1778 return []
1779
1780 next_ip = start_ip
1781 count = 0
1782 while count < no_of_ips:
1783 ipaddress_list.append("{}/{}".format(next_ip, mask))
1784 if addr_type == "ipv6":
1785 next_ip = ipaddress.IPv6Address(int(next_ip) + step)
1786 else:
1787 next_ip += step
1788 count += 1
1789
1790 return ipaddress_list
1791
1792
1793 def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
1794 """
1795 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1796 it will return highest IP from loopback IPs otherwise from physical
1797 interface IPs.
1798 * `topo` : json file data
1799 * `router` : router for which highest interface should be calculated
1800 """
1801
1802 link_data = topo["routers"][router]["links"]
1803 lo_list = []
1804 interfaces_list = []
1805 lo_exists = False
1806 for destRouterLink, data in sorted(link_data.items()):
1807 if loopback:
1808 if "type" in data and data["type"] == "loopback":
1809 lo_exists = True
1810 ip_address = topo["routers"][router]["links"][destRouterLink][
1811 "ipv4"
1812 ].split("/")[0]
1813 lo_list.append(ip_address)
1814 if interface:
1815 ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split(
1816 "/"
1817 )[0]
1818 interfaces_list.append(ip_address)
1819
1820 if lo_exists:
1821 return sorted(lo_list)[-1]
1822
1823 return sorted(interfaces_list)[-1]
1824
1825
1826 def write_test_header(tc_name):
1827 """Display message at beginning of test case"""
1828 count = 20
1829 logger.info("*" * (len(tc_name) + count))
1830 step("START -> Testcase : %s" % tc_name, reset=True)
1831 logger.info("*" * (len(tc_name) + count))
1832
1833
1834 def write_test_footer(tc_name):
1835 """Display message at end of test case"""
1836 count = 21
1837 logger.info("=" * (len(tc_name) + count))
1838 logger.info("Testcase : %s -> PASSED", tc_name)
1839 logger.info("=" * (len(tc_name) + count))
1840
1841
1842 def interface_status(tgen, topo, input_dict):
1843 """
1844 Delete ip route maps from device
1845 * `tgen` : Topogen object
1846 * `topo` : json file data
1847 * `input_dict` : for which router, route map has to be deleted
1848 Usage
1849 -----
1850 input_dict = {
1851 "r3": {
1852 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1853 "status": "down"
1854 }
1855 }
1856 Returns
1857 -------
1858 errormsg(str) or True
1859 """
1860 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1861
1862 try:
1863 rlist = []
1864
1865 for router in input_dict.keys():
1866
1867 interface_list = input_dict[router]["interface_list"]
1868 status = input_dict[router].setdefault("status", "up")
1869 for intf in interface_list:
1870 rnode = tgen.gears[router]
1871 interface_set_status(rnode, intf, status)
1872
1873 rlist.append(router)
1874
1875 # Load config to routers
1876 load_config_to_routers(tgen, rlist)
1877
1878 except Exception as e:
1879 errormsg = traceback.format_exc()
1880 logger.error(errormsg)
1881 return errormsg
1882
1883 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1884 return True
1885
1886
1887 def retry(retry_timeout, initial_wait=0, expected=True, diag_pct=0.75):
1888 """
1889 Fixture: Retries function while it's return value is an errormsg (str), False, or it raises an exception.
1890
1891 * `retry_timeout`: Retry for at least this many seconds; after waiting initial_wait seconds
1892 * `initial_wait`: Sleeps for this many seconds before first executing function
1893 * `expected`: if False then the return logic is inverted, except for exceptions,
1894 (i.e., a False or errmsg (str) function return ends the retry loop,
1895 and returns that False or str value)
1896 * `diag_pct`: Percentage of `retry_timeout` to keep testing after negative result would have
1897 been returned in order to see if a positive result comes after. This is an
1898 important diagnostic tool, and normally should not be disabled. Calls to wrapped
1899 functions though, can override the `diag_pct` value to make it larger in case more
1900 diagnostic retrying is appropriate.
1901 """
1902
1903 def _retry(func):
1904 @wraps(func)
1905 def func_retry(*args, **kwargs):
1906 # We will continue to retry diag_pct of the timeout value to see if test would have passed with a
1907 # longer retry timeout value.
1908 saved_failure = None
1909
1910 retry_sleep = 2
1911
1912 # Allow the wrapped function's args to override the fixtures
1913 _retry_timeout = kwargs.pop("retry_timeout", retry_timeout)
1914 _expected = kwargs.pop("expected", expected)
1915 _initial_wait = kwargs.pop("initial_wait", initial_wait)
1916 _diag_pct = kwargs.pop("diag_pct", diag_pct)
1917
1918 start_time = datetime.now()
1919 retry_until = datetime.now() + timedelta(
1920 seconds=_retry_timeout + _initial_wait
1921 )
1922
1923 if initial_wait > 0:
1924 logger.info("Waiting for [%s]s as initial delay", initial_wait)
1925 sleep(initial_wait)
1926
1927 invert_logic = not _expected
1928 while True:
1929 seconds_left = (retry_until - datetime.now()).total_seconds()
1930 try:
1931 ret = func(*args, **kwargs)
1932 logger.debug("Function returned %s", ret)
1933
1934 negative_result = ret is False or is_string(ret)
1935 if negative_result == invert_logic:
1936 # Simple case, successful result in time
1937 if not saved_failure:
1938 return ret
1939
1940 # Positive result, but happened after timeout failure, very important to
1941 # note for fixing tests.
1942 logger.warning(
1943 "RETRY DIAGNOSTIC: SUCCEED after FAILED with requested timeout of %.1fs; however, succeeded in %.1fs, investigate timeout timing",
1944 _retry_timeout,
1945 (datetime.now() - start_time).total_seconds(),
1946 )
1947 if isinstance(saved_failure, Exception):
1948 raise saved_failure # pylint: disable=E0702
1949 return saved_failure
1950
1951 except Exception as error:
1952 logger.info("Function raised exception: %s", str(error))
1953 ret = error
1954
1955 if seconds_left < 0 and saved_failure:
1956 logger.info(
1957 "RETRY DIAGNOSTIC: Retry timeout reached, still failing"
1958 )
1959 if isinstance(saved_failure, Exception):
1960 raise saved_failure # pylint: disable=E0702
1961 return saved_failure
1962
1963 if seconds_left < 0:
1964 logger.info("Retry timeout of %ds reached", _retry_timeout)
1965
1966 saved_failure = ret
1967 retry_extra_delta = timedelta(
1968 seconds=seconds_left + _retry_timeout * _diag_pct
1969 )
1970 retry_until = datetime.now() + retry_extra_delta
1971 seconds_left = retry_extra_delta.total_seconds()
1972
1973 # Generate bundle after setting remaining diagnostic retry time
1974 generate_support_bundle()
1975
1976 # If user has disabled diagnostic retries return now
1977 if not _diag_pct:
1978 if isinstance(saved_failure, Exception):
1979 raise saved_failure
1980 return saved_failure
1981
1982 if saved_failure:
1983 logger.info(
1984 "RETRY DIAG: [failure] Sleeping %ds until next retry with %.1f retry time left - too see if timeout was too short",
1985 retry_sleep,
1986 seconds_left,
1987 )
1988 else:
1989 logger.info(
1990 "Sleeping %ds until next retry with %.1f retry time left",
1991 retry_sleep,
1992 seconds_left,
1993 )
1994 sleep(retry_sleep)
1995
1996 func_retry._original = func
1997 return func_retry
1998
1999 return _retry
2000
2001
2002 class Stepper:
2003 """
2004 Prints step number for the test case step being executed
2005 """
2006
2007 count = 1
2008
2009 def __call__(self, msg, reset):
2010 if reset:
2011 Stepper.count = 1
2012 logger.info(msg)
2013 else:
2014 logger.info("STEP %s: '%s'", Stepper.count, msg)
2015 Stepper.count += 1
2016
2017
2018 def step(msg, reset=False):
2019 """
2020 Call Stepper to print test steps. Need to reset at the beginning of test.
2021 * ` msg` : Step message body.
2022 * `reset` : Reset step count to 1 when set to True.
2023 """
2024 _step = Stepper()
2025 _step(msg, reset)
2026
2027
2028 def do_countdown(secs):
2029 """
2030 Countdown timer display
2031 """
2032 for i in range(secs, 0, -1):
2033 sys.stdout.write("{} ".format(str(i)))
2034 sys.stdout.flush()
2035 sleep(1)
2036 return
2037
2038
2039 #############################################
2040 # These APIs, will used by testcase
2041 #############################################
2042 def create_interfaces_cfg(tgen, topo, build=False):
2043 """
2044 Create interface configuration for created topology. Basic Interface
2045 configuration is provided in input json file.
2046
2047 Parameters
2048 ----------
2049 * `tgen` : Topogen object
2050 * `topo` : json file data
2051 * `build` : Only for initial setup phase this is set as True.
2052
2053 Returns
2054 -------
2055 True or False
2056 """
2057
2058 def _create_interfaces_ospf_cfg(ospf, c_data, data, ospf_keywords):
2059 interface_data = []
2060 ip_ospf = "ipv6 ospf6" if ospf == "ospf6" else "ip ospf"
2061 for keyword in ospf_keywords:
2062 if keyword in data[ospf]:
2063 intf_ospf_value = c_data["links"][destRouterLink][ospf][keyword]
2064 if "delete" in data and data["delete"]:
2065 interface_data.append(
2066 "no {} {}".format(ip_ospf, keyword.replace("_", "-"))
2067 )
2068 else:
2069 interface_data.append(
2070 "{} {} {}".format(
2071 ip_ospf, keyword.replace("_", "-"), intf_ospf_value
2072 )
2073 )
2074 return interface_data
2075
2076 result = False
2077 topo = deepcopy(topo)
2078
2079 try:
2080 interface_data_dict = {}
2081
2082 for c_router, c_data in topo.items():
2083 interface_data = []
2084 for destRouterLink, data in sorted(c_data["links"].items()):
2085 # Loopback interfaces
2086 if "type" in data and data["type"] == "loopback":
2087 interface_name = destRouterLink
2088 else:
2089 interface_name = data["interface"]
2090
2091 interface_data.append("interface {}".format(str(interface_name)))
2092
2093 if "ipv4" in data:
2094 intf_addr = c_data["links"][destRouterLink]["ipv4"]
2095
2096 if "delete" in data and data["delete"]:
2097 interface_data.append("no ip address {}".format(intf_addr))
2098 else:
2099 interface_data.append("ip address {}".format(intf_addr))
2100 if "ipv6" in data:
2101 intf_addr = c_data["links"][destRouterLink]["ipv6"]
2102
2103 if "delete" in data and data["delete"]:
2104 interface_data.append("no ipv6 address {}".format(intf_addr))
2105 else:
2106 interface_data.append("ipv6 address {}".format(intf_addr))
2107
2108 # Wait for vrf interfaces to get link local address once they are up
2109 if (
2110 not destRouterLink == "lo"
2111 and "vrf" in topo[c_router]["links"][destRouterLink]
2112 ):
2113 vrf = topo[c_router]["links"][destRouterLink]["vrf"]
2114 intf = topo[c_router]["links"][destRouterLink]["interface"]
2115 ll = get_frr_ipv6_linklocal(tgen, c_router, intf=intf, vrf=vrf)
2116
2117 if "ipv6-link-local" in data:
2118 intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
2119
2120 if "delete" in data and data["delete"]:
2121 interface_data.append("no ipv6 address {}".format(intf_addr))
2122 else:
2123 interface_data.append("ipv6 address {}\n".format(intf_addr))
2124
2125 ospf_keywords = [
2126 "hello_interval",
2127 "dead_interval",
2128 "network",
2129 "priority",
2130 "cost",
2131 "mtu_ignore",
2132 ]
2133 if "ospf" in data:
2134 interface_data += _create_interfaces_ospf_cfg(
2135 "ospf", c_data, data, ospf_keywords + ["area"]
2136 )
2137 if "ospf6" in data:
2138 interface_data += _create_interfaces_ospf_cfg(
2139 "ospf6", c_data, data, ospf_keywords + ["area"]
2140 )
2141 if interface_data:
2142 interface_data_dict[c_router] = interface_data
2143
2144 result = create_common_configurations(
2145 tgen, interface_data_dict, "interface_config", build=build
2146 )
2147
2148 except InvalidCLIError:
2149 # Traceback
2150 errormsg = traceback.format_exc()
2151 logger.error(errormsg)
2152 return errormsg
2153
2154 return result
2155
2156
2157 def create_static_routes(tgen, input_dict, build=False):
2158 """
2159 Create static routes for given router as defined in input_dict
2160
2161 Parameters
2162 ----------
2163 * `tgen` : Topogen object
2164 * `input_dict` : Input dict data, required when configuring from testcase
2165 * `build` : Only for initial setup phase this is set as True.
2166
2167 Usage
2168 -----
2169 input_dict should be in the format below:
2170 # static_routes: list of all routes
2171 # network: network address
2172 # no_of_ip: number of next-hop address that will be configured
2173 # admin_distance: admin distance for route/routes.
2174 # next_hop: starting next-hop address
2175 # tag: tag id for static routes
2176 # vrf: VRF name in which static routes needs to be created
2177 # delete: True if config to be removed. Default False.
2178
2179 Example:
2180 "routers": {
2181 "r1": {
2182 "static_routes": [
2183 {
2184 "network": "100.0.20.1/32",
2185 "no_of_ip": 9,
2186 "admin_distance": 100,
2187 "next_hop": "10.0.0.1",
2188 "tag": 4001,
2189 "vrf": "RED_A"
2190 "delete": true
2191 }
2192 ]
2193 }
2194 }
2195
2196 Returns
2197 -------
2198 errormsg(str) or True
2199 """
2200 result = False
2201 logger.debug("Entering lib API: create_static_routes()")
2202 input_dict = deepcopy(input_dict)
2203
2204 try:
2205 static_routes_list_dict = {}
2206
2207 for router in input_dict.keys():
2208 if "static_routes" not in input_dict[router]:
2209 errormsg = "static_routes not present in input_dict"
2210 logger.info(errormsg)
2211 continue
2212
2213 static_routes_list = []
2214
2215 static_routes = input_dict[router]["static_routes"]
2216 for static_route in static_routes:
2217 del_action = static_route.setdefault("delete", False)
2218 no_of_ip = static_route.setdefault("no_of_ip", 1)
2219 network = static_route.setdefault("network", [])
2220 if type(network) is not list:
2221 network = [network]
2222
2223 admin_distance = static_route.setdefault("admin_distance", None)
2224 tag = static_route.setdefault("tag", None)
2225 vrf = static_route.setdefault("vrf", None)
2226 interface = static_route.setdefault("interface", None)
2227 next_hop = static_route.setdefault("next_hop", None)
2228 nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
2229
2230 ip_list = generate_ips(network, no_of_ip)
2231 for ip in ip_list:
2232 addr_type = validate_ip_address(ip)
2233
2234 if addr_type == "ipv4":
2235 cmd = "ip route {}".format(ip)
2236 else:
2237 cmd = "ipv6 route {}".format(ip)
2238
2239 if interface:
2240 cmd = "{} {}".format(cmd, interface)
2241
2242 if next_hop:
2243 cmd = "{} {}".format(cmd, next_hop)
2244
2245 if nexthop_vrf:
2246 cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
2247
2248 if vrf:
2249 cmd = "{} vrf {}".format(cmd, vrf)
2250
2251 if tag:
2252 cmd = "{} tag {}".format(cmd, str(tag))
2253
2254 if admin_distance:
2255 cmd = "{} {}".format(cmd, admin_distance)
2256
2257 if del_action:
2258 cmd = "no {}".format(cmd)
2259
2260 static_routes_list.append(cmd)
2261
2262 if static_routes_list:
2263 static_routes_list_dict[router] = static_routes_list
2264
2265 result = create_common_configurations(
2266 tgen, static_routes_list_dict, "static_route", build=build
2267 )
2268
2269 except InvalidCLIError:
2270 # Traceback
2271 errormsg = traceback.format_exc()
2272 logger.error(errormsg)
2273 return errormsg
2274
2275 logger.debug("Exiting lib API: create_static_routes()")
2276 return result
2277
2278
2279 def create_prefix_lists(tgen, input_dict, build=False):
2280 """
2281 Create ip prefix lists as per the config provided in input
2282 JSON or input_dict
2283 Parameters
2284 ----------
2285 * `tgen` : Topogen object
2286 * `input_dict` : Input dict data, required when configuring from testcase
2287 * `build` : Only for initial setup phase this is set as True.
2288 Usage
2289 -----
2290 # pf_lists_1: name of prefix-list, user defined
2291 # seqid: prefix-list seqid, auto-generated if not given by user
2292 # network: criteria for applying prefix-list
2293 # action: permit/deny
2294 # le: less than or equal number of bits
2295 # ge: greater than or equal number of bits
2296 Example
2297 -------
2298 input_dict = {
2299 "r1": {
2300 "prefix_lists":{
2301 "ipv4": {
2302 "pf_list_1": [
2303 {
2304 "seqid": 10,
2305 "network": "any",
2306 "action": "permit",
2307 "le": "32",
2308 "ge": "30",
2309 "delete": True
2310 }
2311 ]
2312 }
2313 }
2314 }
2315 }
2316 Returns
2317 -------
2318 errormsg or True
2319 """
2320
2321 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2322 result = False
2323 try:
2324 config_data_dict = {}
2325
2326 for router in input_dict.keys():
2327 if "prefix_lists" not in input_dict[router]:
2328 errormsg = "prefix_lists not present in input_dict"
2329 logger.debug(errormsg)
2330 continue
2331
2332 config_data = []
2333 prefix_lists = input_dict[router]["prefix_lists"]
2334 for addr_type, prefix_data in prefix_lists.items():
2335 if not check_address_types(addr_type):
2336 continue
2337
2338 for prefix_name, prefix_list in prefix_data.items():
2339 for prefix_dict in prefix_list:
2340 if "action" not in prefix_dict or "network" not in prefix_dict:
2341 errormsg = "'action' or network' missing in" " input_dict"
2342 return errormsg
2343
2344 network_addr = prefix_dict["network"]
2345 action = prefix_dict["action"]
2346 le = prefix_dict.setdefault("le", None)
2347 ge = prefix_dict.setdefault("ge", None)
2348 seqid = prefix_dict.setdefault("seqid", None)
2349 del_action = prefix_dict.setdefault("delete", False)
2350 if seqid is None:
2351 seqid = get_seq_id("prefix_lists", router, prefix_name)
2352 else:
2353 set_seq_id("prefix_lists", router, seqid, prefix_name)
2354
2355 if addr_type == "ipv4":
2356 protocol = "ip"
2357 else:
2358 protocol = "ipv6"
2359
2360 cmd = "{} prefix-list {} seq {} {} {}".format(
2361 protocol, prefix_name, seqid, action, network_addr
2362 )
2363 if le:
2364 cmd = "{} le {}".format(cmd, le)
2365 if ge:
2366 cmd = "{} ge {}".format(cmd, ge)
2367
2368 if del_action:
2369 cmd = "no {}".format(cmd)
2370
2371 config_data.append(cmd)
2372 if config_data:
2373 config_data_dict[router] = config_data
2374
2375 result = create_common_configurations(
2376 tgen, config_data_dict, "prefix_list", build=build
2377 )
2378
2379 except InvalidCLIError:
2380 # Traceback
2381 errormsg = traceback.format_exc()
2382 logger.error(errormsg)
2383 return errormsg
2384
2385 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2386 return result
2387
2388
2389 def create_route_maps(tgen, input_dict, build=False):
2390 """
2391 Create route-map on the devices as per the arguments passed
2392 Parameters
2393 ----------
2394 * `tgen` : Topogen object
2395 * `input_dict` : Input dict data, required when configuring from testcase
2396 * `build` : Only for initial setup phase this is set as True.
2397 Usage
2398 -----
2399 # route_maps: key, value pair for route-map name and its attribute
2400 # rmap_match_prefix_list_1: user given name for route-map
2401 # action: PERMIT/DENY
2402 # match: key,value pair for match criteria. prefix_list, community-list,
2403 large-community-list or tag. Only one option at a time.
2404 # prefix_list: name of prefix list
2405 # large-community-list: name of large community list
2406 # community-ist: name of community list
2407 # tag: tag id for static routes
2408 # set: key, value pair for modifying route attributes
2409 # localpref: preference value for the network
2410 # med: metric value advertised for AS
2411 # aspath: set AS path value
2412 # weight: weight for the route
2413 # community: standard community value to be attached
2414 # large_community: large community value to be attached
2415 # community_additive: if set to "additive", adds community/large-community
2416 value to the existing values of the network prefix
2417 Example:
2418 --------
2419 input_dict = {
2420 "r1": {
2421 "route_maps": {
2422 "rmap_match_prefix_list_1": [
2423 {
2424 "action": "PERMIT",
2425 "match": {
2426 "ipv4": {
2427 "prefix_list": "pf_list_1"
2428 }
2429 "ipv6": {
2430 "prefix_list": "pf_list_1"
2431 }
2432 "large-community-list": {
2433 "id": "community_1",
2434 "exact_match": True
2435 }
2436 "community_list": {
2437 "id": "community_2",
2438 "exact_match": True
2439 }
2440 "tag": "tag_id"
2441 },
2442 "set": {
2443 "locPrf": 150,
2444 "metric": 30,
2445 "path": {
2446 "num": 20000,
2447 "action": "prepend",
2448 },
2449 "weight": 500,
2450 "community": {
2451 "num": "1:2 2:3",
2452 "action": additive
2453 }
2454 "large_community": {
2455 "num": "1:2:3 4:5;6",
2456 "action": additive
2457 },
2458 }
2459 }
2460 ]
2461 }
2462 }
2463 }
2464 Returns
2465 -------
2466 errormsg(str) or True
2467 """
2468
2469 result = False
2470 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2471 input_dict = deepcopy(input_dict)
2472 try:
2473 rmap_data_dict = {}
2474
2475 for router in input_dict.keys():
2476 if "route_maps" not in input_dict[router]:
2477 logger.debug("route_maps not present in input_dict")
2478 continue
2479 rmap_data = []
2480 for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
2481
2482 for rmap_dict in rmap_value:
2483 del_action = rmap_dict.setdefault("delete", False)
2484
2485 if del_action:
2486 rmap_data.append("no route-map {}".format(rmap_name))
2487 continue
2488
2489 if "action" not in rmap_dict:
2490 errormsg = "action not present in input_dict"
2491 logger.error(errormsg)
2492 return False
2493
2494 rmap_action = rmap_dict.setdefault("action", "deny")
2495
2496 seq_id = rmap_dict.setdefault("seq_id", None)
2497 if seq_id is None:
2498 seq_id = get_seq_id("route_maps", router, rmap_name)
2499 else:
2500 set_seq_id("route_maps", router, seq_id, rmap_name)
2501
2502 rmap_data.append(
2503 "route-map {} {} {}".format(rmap_name, rmap_action, seq_id)
2504 )
2505
2506 if "continue" in rmap_dict:
2507 continue_to = rmap_dict["continue"]
2508 if continue_to:
2509 rmap_data.append("on-match goto {}".format(continue_to))
2510 else:
2511 logger.error(
2512 "In continue, 'route-map entry "
2513 "sequence number' is not provided"
2514 )
2515 return False
2516
2517 if "goto" in rmap_dict:
2518 go_to = rmap_dict["goto"]
2519 if go_to:
2520 rmap_data.append("on-match goto {}".format(go_to))
2521 else:
2522 logger.error(
2523 "In goto, 'Goto Clause number' is not" " provided"
2524 )
2525 return False
2526
2527 if "call" in rmap_dict:
2528 call_rmap = rmap_dict["call"]
2529 if call_rmap:
2530 rmap_data.append("call {}".format(call_rmap))
2531 else:
2532 logger.error(
2533 "In call, 'destination Route-Map' is" " not provided"
2534 )
2535 return False
2536
2537 # Verifying if SET criteria is defined
2538 if "set" in rmap_dict:
2539 set_data = rmap_dict["set"]
2540 ipv4_data = set_data.setdefault("ipv4", {})
2541 ipv6_data = set_data.setdefault("ipv6", {})
2542 local_preference = set_data.setdefault("locPrf", None)
2543 metric = set_data.setdefault("metric", None)
2544 metric_type = set_data.setdefault("metric-type", None)
2545 as_path = set_data.setdefault("path", {})
2546 weight = set_data.setdefault("weight", None)
2547 community = set_data.setdefault("community", {})
2548 large_community = set_data.setdefault("large_community", {})
2549 large_comm_list = set_data.setdefault("large_comm_list", {})
2550 set_action = set_data.setdefault("set_action", None)
2551 nexthop = set_data.setdefault("nexthop", None)
2552 origin = set_data.setdefault("origin", None)
2553 ext_comm_list = set_data.setdefault("extcommunity", {})
2554 metrictype = set_data.setdefault("metric-type", {})
2555
2556 # Local Preference
2557 if local_preference:
2558 rmap_data.append(
2559 "set local-preference {}".format(local_preference)
2560 )
2561
2562 # Metric-Type
2563 if metrictype:
2564 rmap_data.append("set metric-type {}\n".format(metrictype))
2565
2566 # Metric
2567 if metric:
2568 del_comm = set_data.setdefault("delete", None)
2569 if del_comm:
2570 rmap_data.append("no set metric {}".format(metric))
2571 else:
2572 rmap_data.append("set metric {}".format(metric))
2573
2574 # Origin
2575 if origin:
2576 rmap_data.append("set origin {} \n".format(origin))
2577
2578 # AS Path Prepend
2579 if as_path:
2580 as_num = as_path.setdefault("as_num", None)
2581 as_action = as_path.setdefault("as_action", None)
2582 if as_action and as_num:
2583 rmap_data.append(
2584 "set as-path {} {}".format(as_action, as_num)
2585 )
2586
2587 # Community
2588 if community:
2589 num = community.setdefault("num", None)
2590 comm_action = community.setdefault("action", None)
2591 if num:
2592 cmd = "set community {}".format(num)
2593 if comm_action:
2594 cmd = "{} {}".format(cmd, comm_action)
2595 rmap_data.append(cmd)
2596 else:
2597 logger.error("In community, AS Num not" " provided")
2598 return False
2599
2600 if large_community:
2601 num = large_community.setdefault("num", None)
2602 comm_action = large_community.setdefault("action", None)
2603 if num:
2604 cmd = "set large-community {}".format(num)
2605 if comm_action:
2606 cmd = "{} {}".format(cmd, comm_action)
2607
2608 rmap_data.append(cmd)
2609 else:
2610 logger.error(
2611 "In large_community, AS Num not" " provided"
2612 )
2613 return False
2614 if large_comm_list:
2615 id = large_comm_list.setdefault("id", None)
2616 del_comm = large_comm_list.setdefault("delete", None)
2617 if id:
2618 cmd = "set large-comm-list {}".format(id)
2619 if del_comm:
2620 cmd = "{} delete".format(cmd)
2621
2622 rmap_data.append(cmd)
2623 else:
2624 logger.error("In large_comm_list 'id' not" " provided")
2625 return False
2626
2627 if ext_comm_list:
2628 rt = ext_comm_list.setdefault("rt", None)
2629 del_comm = ext_comm_list.setdefault("delete", None)
2630 if rt:
2631 cmd = "set extcommunity rt {}".format(rt)
2632 if del_comm:
2633 cmd = "{} delete".format(cmd)
2634
2635 rmap_data.append(cmd)
2636 else:
2637 logger.debug("In ext_comm_list 'rt' not" " provided")
2638 return False
2639
2640 # Weight
2641 if weight:
2642 rmap_data.append("set weight {}".format(weight))
2643 if ipv6_data:
2644 nexthop = ipv6_data.setdefault("nexthop", None)
2645 if nexthop:
2646 rmap_data.append("set ipv6 next-hop {}".format(nexthop))
2647
2648 # Adding MATCH and SET sequence to RMAP if defined
2649 if "match" in rmap_dict:
2650 match_data = rmap_dict["match"]
2651 ipv4_data = match_data.setdefault("ipv4", {})
2652 ipv6_data = match_data.setdefault("ipv6", {})
2653 community = match_data.setdefault("community_list", {})
2654 large_community = match_data.setdefault("large_community", {})
2655 large_community_list = match_data.setdefault(
2656 "large_community_list", {}
2657 )
2658
2659 metric = match_data.setdefault("metric", None)
2660 source_vrf = match_data.setdefault("source-vrf", None)
2661
2662 if ipv4_data:
2663 # fetch prefix list data from rmap
2664 prefix_name = ipv4_data.setdefault("prefix_lists", None)
2665 if prefix_name:
2666 rmap_data.append(
2667 "match ip address"
2668 " prefix-list {}".format(prefix_name)
2669 )
2670
2671 # fetch tag data from rmap
2672 tag = ipv4_data.setdefault("tag", None)
2673 if tag:
2674 rmap_data.append("match tag {}".format(tag))
2675
2676 # fetch large community data from rmap
2677 large_community_list = ipv4_data.setdefault(
2678 "large_community_list", {}
2679 )
2680 large_community = match_data.setdefault(
2681 "large_community", {}
2682 )
2683
2684 if ipv6_data:
2685 prefix_name = ipv6_data.setdefault("prefix_lists", None)
2686 if prefix_name:
2687 rmap_data.append(
2688 "match ipv6 address"
2689 " prefix-list {}".format(prefix_name)
2690 )
2691
2692 # fetch tag data from rmap
2693 tag = ipv6_data.setdefault("tag", None)
2694 if tag:
2695 rmap_data.append("match tag {}".format(tag))
2696
2697 # fetch large community data from rmap
2698 large_community_list = ipv6_data.setdefault(
2699 "large_community_list", {}
2700 )
2701 large_community = match_data.setdefault(
2702 "large_community", {}
2703 )
2704
2705 if community:
2706 if "id" not in community:
2707 logger.error(
2708 "'id' is mandatory for "
2709 "community-list in match"
2710 " criteria"
2711 )
2712 return False
2713 cmd = "match community {}".format(community["id"])
2714 exact_match = community.setdefault("exact_match", False)
2715 if exact_match:
2716 cmd = "{} exact-match".format(cmd)
2717
2718 rmap_data.append(cmd)
2719 if large_community:
2720 if "id" not in large_community:
2721 logger.error(
2722 "'id' is mandatory for "
2723 "large-community-list in match "
2724 "criteria"
2725 )
2726 return False
2727 cmd = "match large-community {}".format(
2728 large_community["id"]
2729 )
2730 exact_match = large_community.setdefault(
2731 "exact_match", False
2732 )
2733 if exact_match:
2734 cmd = "{} exact-match".format(cmd)
2735 rmap_data.append(cmd)
2736 if large_community_list:
2737 if "id" not in large_community_list:
2738 logger.error(
2739 "'id' is mandatory for "
2740 "large-community-list in match "
2741 "criteria"
2742 )
2743 return False
2744 cmd = "match large-community {}".format(
2745 large_community_list["id"]
2746 )
2747 exact_match = large_community_list.setdefault(
2748 "exact_match", False
2749 )
2750 if exact_match:
2751 cmd = "{} exact-match".format(cmd)
2752 rmap_data.append(cmd)
2753
2754 if source_vrf:
2755 cmd = "match source-vrf {}".format(source_vrf)
2756 rmap_data.append(cmd)
2757
2758 if metric:
2759 cmd = "match metric {}".format(metric)
2760 rmap_data.append(cmd)
2761
2762 if rmap_data:
2763 rmap_data_dict[router] = rmap_data
2764
2765 result = create_common_configurations(
2766 tgen, rmap_data_dict, "route_maps", build=build
2767 )
2768
2769 except InvalidCLIError:
2770 # Traceback
2771 errormsg = traceback.format_exc()
2772 logger.error(errormsg)
2773 return errormsg
2774
2775 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2776 return result
2777
2778
2779 def delete_route_maps(tgen, input_dict):
2780 """
2781 Delete ip route maps from device
2782 * `tgen` : Topogen object
2783 * `input_dict` : for which router,
2784 route map has to be deleted
2785 Usage
2786 -----
2787 # Delete route-map rmap_1 and rmap_2 from router r1
2788 input_dict = {
2789 "r1": {
2790 "route_maps": ["rmap_1", "rmap__2"]
2791 }
2792 }
2793 result = delete_route_maps("ipv4", input_dict)
2794 Returns
2795 -------
2796 errormsg(str) or True
2797 """
2798 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2799
2800 for router in input_dict.keys():
2801 route_maps = input_dict[router]["route_maps"][:]
2802 rmap_data = input_dict[router]
2803 rmap_data["route_maps"] = {}
2804 for route_map_name in route_maps:
2805 rmap_data["route_maps"].update({route_map_name: [{"delete": True}]})
2806
2807 return create_route_maps(tgen, input_dict)
2808
2809
2810 def create_bgp_community_lists(tgen, input_dict, build=False):
2811 """
2812 Create bgp community-list or large-community-list on the devices as per
2813 the arguments passed. Takes list of communities in input.
2814 Parameters
2815 ----------
2816 * `tgen` : Topogen object
2817 * `input_dict` : Input dict data, required when configuring from testcase
2818 * `build` : Only for initial setup phase this is set as True.
2819 Usage
2820 -----
2821 input_dict_1 = {
2822 "r3": {
2823 "bgp_community_lists": [
2824 {
2825 "community_type": "standard",
2826 "action": "permit",
2827 "name": "rmap_lcomm_{}".format(addr_type),
2828 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2829 "large": True
2830 }
2831 ]
2832 }
2833 }
2834 }
2835 result = create_bgp_community_lists(tgen, input_dict_1)
2836 """
2837
2838 result = False
2839 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2840 input_dict = deepcopy(input_dict)
2841 try:
2842 config_data_dict = {}
2843
2844 for router in input_dict.keys():
2845 if "bgp_community_lists" not in input_dict[router]:
2846 errormsg = "bgp_community_lists not present in input_dict"
2847 logger.debug(errormsg)
2848 continue
2849
2850 config_data = []
2851
2852 community_list = input_dict[router]["bgp_community_lists"]
2853 for community_dict in community_list:
2854 del_action = community_dict.setdefault("delete", False)
2855 community_type = community_dict.setdefault("community_type", None)
2856 action = community_dict.setdefault("action", None)
2857 value = community_dict.setdefault("value", "")
2858 large = community_dict.setdefault("large", None)
2859 name = community_dict.setdefault("name", None)
2860 if large:
2861 cmd = "bgp large-community-list"
2862 else:
2863 cmd = "bgp community-list"
2864
2865 if not large and not (community_type and action and value):
2866 errormsg = (
2867 "community_type, action and value are "
2868 "required in bgp_community_list"
2869 )
2870 logger.error(errormsg)
2871 return False
2872
2873 cmd = "{} {} {} {} {}".format(cmd, community_type, name, action, value)
2874
2875 if del_action:
2876 cmd = "no {}".format(cmd)
2877
2878 config_data.append(cmd)
2879
2880 if config_data:
2881 config_data_dict[router] = config_data
2882
2883 result = create_common_configurations(
2884 tgen, config_data_dict, "bgp_community_list", build=build
2885 )
2886
2887 except InvalidCLIError:
2888 # Traceback
2889 errormsg = traceback.format_exc()
2890 logger.error(errormsg)
2891 return errormsg
2892
2893 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2894 return result
2895
2896
2897 def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
2898 """
2899 Shutdown or bringup router's interface "
2900 * `tgen` : Topogen object
2901 * `dut` : Device under test
2902 * `intf_name` : Interface name to be shut/no shut
2903 * `ifaceaction` : Action, to shut/no shut interface,
2904 by default is False
2905 Usage
2906 -----
2907 dut = "r3"
2908 intf = "r3-r1-eth0"
2909 # Shut down interface
2910 shutdown_bringup_interface(tgen, dut, intf, False)
2911 # Bring up interface
2912 shutdown_bringup_interface(tgen, dut, intf, True)
2913 Returns
2914 -------
2915 errormsg(str) or True
2916 """
2917
2918 router_list = tgen.routers()
2919 if ifaceaction:
2920 logger.info("Bringing up interface {} : {}".format(dut, intf_name))
2921 else:
2922 logger.info("Shutting down interface {} : {}".format(dut, intf_name))
2923
2924 interface_set_status(router_list[dut], intf_name, ifaceaction)
2925
2926
2927 def addKernelRoute(
2928 tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
2929 ):
2930 """
2931 Add route to kernel
2932
2933 Parameters:
2934 -----------
2935 * `tgen` : Topogen object
2936 * `router`: router for which kernel routes needs to be added
2937 * `intf`: interface name, for which kernel routes needs to be added
2938 * `bindToAddress`: bind to <host>, an interface or multicast
2939 address
2940
2941 returns:
2942 --------
2943 errormsg or True
2944 """
2945
2946 logger.debug("Entering lib API: addKernelRoute()")
2947
2948 rnode = tgen.gears[router]
2949
2950 if type(group_addr_range) is not list:
2951 group_addr_range = [group_addr_range]
2952
2953 for grp_addr in group_addr_range:
2954
2955 addr_type = validate_ip_address(grp_addr)
2956 if addr_type == "ipv4":
2957 if next_hop is not None:
2958 cmd = "ip route add {} via {}".format(grp_addr, next_hop)
2959 else:
2960 cmd = "ip route add {} dev {}".format(grp_addr, intf)
2961 if del_action:
2962 cmd = "ip route del {}".format(grp_addr)
2963 verify_cmd = "ip route"
2964 elif addr_type == "ipv6":
2965 if intf and src:
2966 cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
2967 else:
2968 cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
2969 verify_cmd = "ip -6 route"
2970 if del_action:
2971 cmd = "ip -6 route del {}".format(grp_addr)
2972
2973 logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
2974 output = rnode.run(cmd)
2975
2976 def check_in_kernel(rnode, verify_cmd, grp_addr, router):
2977 # Verifying if ip route added to kernel
2978 errormsg = None
2979 result = rnode.run(verify_cmd)
2980 logger.debug("{}\n{}".format(verify_cmd, result))
2981 if "/" in grp_addr:
2982 ip, mask = grp_addr.split("/")
2983 if mask == "32" or mask == "128":
2984 grp_addr = ip
2985 else:
2986 mask = "32" if addr_type == "ipv4" else "128"
2987
2988 if not re_search(r"{}".format(grp_addr), result) and mask != "0":
2989 errormsg = (
2990 "[DUT: {}]: Kernal route is not added for group"
2991 " address {} Config output: {}".format(
2992 router, grp_addr, output
2993 )
2994 )
2995
2996 return errormsg
2997
2998 test_func = functools.partial(
2999 check_in_kernel, rnode, verify_cmd, grp_addr, router
3000 )
3001 (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1)
3002 assert result, out
3003
3004 logger.debug("Exiting lib API: addKernelRoute()")
3005 return True
3006
3007
3008 def configure_vxlan(tgen, input_dict):
3009 """
3010 Add and configure vxlan
3011
3012 * `tgen`: tgen object
3013 * `input_dict` : data for vxlan config
3014
3015 Usage:
3016 ------
3017 input_dict= {
3018 "dcg2":{
3019 "vxlan":[{
3020 "vxlan_name": "vxlan75100",
3021 "vxlan_id": "75100",
3022 "dstport": 4789,
3023 "local_addr": "120.0.0.1",
3024 "learning": "no",
3025 "delete": True
3026 }]
3027 }
3028 }
3029
3030 configure_vxlan(tgen, input_dict)
3031
3032 Returns:
3033 -------
3034 True or errormsg
3035
3036 """
3037
3038 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3039
3040 router_list = tgen.routers()
3041 for dut in input_dict.keys():
3042 rnode = router_list[dut]
3043
3044 if "vxlan" in input_dict[dut]:
3045 for vxlan_dict in input_dict[dut]["vxlan"]:
3046 cmd = "ip link "
3047
3048 del_vxlan = vxlan_dict.setdefault("delete", None)
3049 vxlan_names = vxlan_dict.setdefault("vxlan_name", [])
3050 vxlan_ids = vxlan_dict.setdefault("vxlan_id", [])
3051 dstport = vxlan_dict.setdefault("dstport", None)
3052 local_addr = vxlan_dict.setdefault("local_addr", None)
3053 learning = vxlan_dict.setdefault("learning", None)
3054
3055 config_data = []
3056 if vxlan_names and vxlan_ids:
3057 for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids):
3058 cmd = "ip link"
3059
3060 if del_vxlan:
3061 cmd = "{} del {} type vxlan id {}".format(
3062 cmd, vxlan_name, vxlan_id
3063 )
3064 else:
3065 cmd = "{} add {} type vxlan id {}".format(
3066 cmd, vxlan_name, vxlan_id
3067 )
3068
3069 if dstport:
3070 cmd = "{} dstport {}".format(cmd, dstport)
3071
3072 if local_addr:
3073 ip_cmd = "ip addr add {} dev {}".format(
3074 local_addr, vxlan_name
3075 )
3076 if del_vxlan:
3077 ip_cmd = "ip addr del {} dev {}".format(
3078 local_addr, vxlan_name
3079 )
3080
3081 config_data.append(ip_cmd)
3082
3083 cmd = "{} local {}".format(cmd, local_addr)
3084
3085 if learning == "no":
3086 cmd = "{} nolearning".format(cmd)
3087
3088 elif learning == "yes":
3089 cmd = "{} learning".format(cmd)
3090
3091 config_data.append(cmd)
3092
3093 try:
3094 for _cmd in config_data:
3095 logger.info("[DUT: %s]: Running command: %s", dut, _cmd)
3096 rnode.run(_cmd)
3097
3098 except InvalidCLIError:
3099 # Traceback
3100 errormsg = traceback.format_exc()
3101 logger.error(errormsg)
3102 return errormsg
3103
3104 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3105
3106 return True
3107
3108
3109 def configure_brctl(tgen, topo, input_dict):
3110 """
3111 Add and configure brctl
3112
3113 * `tgen`: tgen object
3114 * `input_dict` : data for brctl config
3115
3116 Usage:
3117 ------
3118 input_dict= {
3119 dut:{
3120 "brctl": [{
3121 "brctl_name": "br100",
3122 "addvxlan": "vxlan75100",
3123 "vrf": "RED",
3124 "stp": "off"
3125 }]
3126 }
3127 }
3128
3129 configure_brctl(tgen, topo, input_dict)
3130
3131 Returns:
3132 -------
3133 True or errormsg
3134
3135 """
3136
3137 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3138
3139 router_list = tgen.routers()
3140 for dut in input_dict.keys():
3141 rnode = router_list[dut]
3142
3143 if "brctl" in input_dict[dut]:
3144 for brctl_dict in input_dict[dut]["brctl"]:
3145
3146 brctl_names = brctl_dict.setdefault("brctl_name", [])
3147 addvxlans = brctl_dict.setdefault("addvxlan", [])
3148 stp_values = brctl_dict.setdefault("stp", [])
3149 vrfs = brctl_dict.setdefault("vrf", [])
3150
3151 ip_cmd = "ip link set"
3152 for brctl_name, vxlan, vrf, stp in zip(
3153 brctl_names, addvxlans, vrfs, stp_values
3154 ):
3155
3156 ip_cmd_list = []
3157 cmd = "ip link add name {} type bridge stp_state {}".format(
3158 brctl_name, stp
3159 )
3160
3161 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3162 rnode.run(cmd)
3163
3164 ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name))
3165
3166 if vxlan:
3167 cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name)
3168
3169 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3170 rnode.run(cmd)
3171
3172 ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan))
3173
3174 if vrf:
3175 ip_cmd_list.append(
3176 "{} dev {} master {}".format(ip_cmd, brctl_name, vrf)
3177 )
3178
3179 for intf_name, data in topo["routers"][dut]["links"].items():
3180 if "vrf" not in data:
3181 continue
3182
3183 if data["vrf"] == vrf:
3184 ip_cmd_list.append(
3185 "{} up dev {}".format(ip_cmd, data["interface"])
3186 )
3187
3188 try:
3189 for _ip_cmd in ip_cmd_list:
3190 logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd)
3191 rnode.run(_ip_cmd)
3192
3193 except InvalidCLIError:
3194 # Traceback
3195 errormsg = traceback.format_exc()
3196 logger.error(errormsg)
3197 return errormsg
3198
3199 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3200 return True
3201
3202
3203 def configure_interface_mac(tgen, input_dict):
3204 """
3205 Add and configure brctl
3206
3207 * `tgen`: tgen object
3208 * `input_dict` : data for mac config
3209
3210 input_mac= {
3211 "edge1":{
3212 "br75100": "00:80:48:BA:d1:00,
3213 "br75200": "00:80:48:BA:d1:00
3214 }
3215 }
3216
3217 configure_interface_mac(tgen, input_mac)
3218
3219 Returns:
3220 -------
3221 True or errormsg
3222
3223 """
3224
3225 router_list = tgen.routers()
3226 for dut in input_dict.keys():
3227 rnode = router_list[dut]
3228
3229 for intf, mac in input_dict[dut].items():
3230 cmd = "ip link set {} address {}".format(intf, mac)
3231 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3232
3233 try:
3234 result = rnode.run(cmd)
3235 if len(result) != 0:
3236 return result
3237
3238 except InvalidCLIError:
3239 # Traceback
3240 errormsg = traceback.format_exc()
3241 logger.error(errormsg)
3242 return errormsg
3243
3244 return True
3245
3246
3247 def socat_send_igmp_join_traffic(
3248 tgen,
3249 server,
3250 protocol_option,
3251 igmp_groups,
3252 send_from_intf,
3253 send_from_intf_ip=None,
3254 port=12345,
3255 reuseaddr=True,
3256 join=False,
3257 traffic=False,
3258 ):
3259 """
3260 API to send IGMP join using SOCAT tool
3261
3262 Parameters:
3263 -----------
3264 * `tgen` : Topogen object
3265 * `server`: iperf server, from where IGMP join would be sent
3266 * `protocol_option`: Protocol options, ex: UDP6-RECV
3267 * `igmp_groups`: IGMP group for which join has to be sent
3268 * `send_from_intf`: Interface from which join would be sent
3269 * `send_from_intf_ip`: Interface IP, default is None
3270 * `port`: Port to be used, default is 12345
3271 * `reuseaddr`: True|False, bydefault True
3272 * `join`: If join needs to be sent
3273 * `traffic`: If traffic needs to be sent
3274
3275 returns:
3276 --------
3277 errormsg or True
3278 """
3279
3280 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3281
3282 rnode = tgen.routers()[server]
3283 socat_cmd = "socat -u "
3284
3285 # UDP4/TCP4/UDP6/UDP6-RECV
3286 if protocol_option:
3287 socat_cmd += "{}".format(protocol_option)
3288
3289 if port:
3290 socat_cmd += ":{},".format(port)
3291
3292 if reuseaddr:
3293 socat_cmd += "{},".format("reuseaddr")
3294
3295 # Group address range to cover
3296 if igmp_groups:
3297 if not isinstance(igmp_groups, list):
3298 igmp_groups = [igmp_groups]
3299
3300 for igmp_group in igmp_groups:
3301 if join:
3302 join_traffic_option = "ipv6-join-group"
3303 elif traffic:
3304 join_traffic_option = "ipv6-join-group-source"
3305
3306 if send_from_intf and not send_from_intf_ip:
3307 socat_cmd += "{}='[{}]:{}'".format(
3308 join_traffic_option, igmp_group, send_from_intf
3309 )
3310 else:
3311 socat_cmd += "{}='[{}]:{}:[{}]'".format(
3312 join_traffic_option, igmp_group, send_from_intf, send_from_intf_ip
3313 )
3314
3315 socat_cmd += " STDOUT"
3316
3317 socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
3318
3319 # Run socat command to send IGMP join
3320 logger.info("[DUT: {}]: Running command: [{}]".format(server, socat_cmd))
3321 output = rnode.run("set +m; {} sleep 0.5".format(socat_cmd))
3322
3323 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3324 return True
3325
3326
3327 #############################################
3328 # Verification APIs
3329 #############################################
3330 @retry(retry_timeout=40)
3331 def verify_rib(
3332 tgen,
3333 addr_type,
3334 dut,
3335 input_dict,
3336 next_hop=None,
3337 protocol=None,
3338 tag=None,
3339 metric=None,
3340 fib=None,
3341 count_only=False,
3342 ):
3343 """
3344 Data will be read from input_dict or input JSON file, API will generate
3345 same prefixes, which were redistributed by either create_static_routes() or
3346 advertise_networks_using_network_command() and do will verify next_hop and
3347 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
3348 command o/p.
3349
3350 Parameters
3351 ----------
3352 * `tgen` : topogen object
3353 * `addr_type` : ip type, ipv4/ipv6
3354 * `dut`: Device Under Test, for which user wants to test the data
3355 * `input_dict` : input dict, has details of static routes
3356 * `next_hop`[optional]: next_hop which needs to be verified,
3357 default: static
3358 * `protocol`[optional]: protocol, default = None
3359 * `count_only`[optional]: count of nexthops only, not specific addresses,
3360 default = False
3361
3362 Usage
3363 -----
3364 # RIB can be verified for static routes OR network advertised using
3365 network command. Following are input_dicts to create static routes
3366 and advertise networks using network command. Any one of the input_dict
3367 can be passed to verify_rib() to verify routes in DUT"s RIB.
3368
3369 # Creating static routes for r1
3370 input_dict = {
3371 "r1": {
3372 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
3373 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
3374 }}
3375 # Advertising networks using network command in router r1
3376 input_dict = {
3377 "r1": {
3378 "advertise_networks": [{"start_ip": "20.0.0.0/32",
3379 "no_of_network": 10},
3380 {"start_ip": "30.0.0.0/32"}]
3381 }}
3382 # Verifying ipv4 routes in router r1 learned via BGP
3383 dut = "r2"
3384 protocol = "bgp"
3385 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
3386
3387 Returns
3388 -------
3389 errormsg(str) or True
3390 """
3391
3392 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3393
3394 router_list = tgen.routers()
3395 additional_nexthops_in_required_nhs = []
3396 found_hops = []
3397 for routerInput in input_dict.keys():
3398 for router, rnode in router_list.items():
3399 if router != dut:
3400 continue
3401
3402 logger.info("Checking router %s RIB:", router)
3403
3404 # Verifying RIB routes
3405 if addr_type == "ipv4":
3406 command = "show ip route"
3407 else:
3408 command = "show ipv6 route"
3409
3410 found_routes = []
3411 missing_routes = []
3412
3413 if "static_routes" in input_dict[routerInput]:
3414 static_routes = input_dict[routerInput]["static_routes"]
3415
3416 for static_route in static_routes:
3417 if "vrf" in static_route and static_route["vrf"] is not None:
3418
3419 logger.info(
3420 "[DUT: {}]: Verifying routes for VRF:"
3421 " {}".format(router, static_route["vrf"])
3422 )
3423
3424 cmd = "{} vrf {}".format(command, static_route["vrf"])
3425
3426 else:
3427 cmd = "{}".format(command)
3428
3429 if protocol:
3430 cmd = "{} {}".format(cmd, protocol)
3431
3432 cmd = "{} json".format(cmd)
3433
3434 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3435
3436 # Verifying output dictionary rib_routes_json is not empty
3437 if bool(rib_routes_json) is False:
3438 errormsg = "No route found in rib of router {}..".format(router)
3439 return errormsg
3440
3441 network = static_route["network"]
3442 if "no_of_ip" in static_route:
3443 no_of_ip = static_route["no_of_ip"]
3444 else:
3445 no_of_ip = 1
3446
3447 if "tag" in static_route:
3448 _tag = static_route["tag"]
3449 else:
3450 _tag = None
3451
3452 # Generating IPs for verification
3453 ip_list = generate_ips(network, no_of_ip)
3454 st_found = False
3455 nh_found = False
3456
3457 for st_rt in ip_list:
3458 st_rt = str(
3459 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
3460 )
3461 _addr_type = validate_ip_address(st_rt)
3462 if _addr_type != addr_type:
3463 continue
3464
3465 if st_rt in rib_routes_json:
3466 st_found = True
3467 found_routes.append(st_rt)
3468
3469 if "queued" in rib_routes_json[st_rt][0]:
3470 errormsg = "Route {} is queued\n".format(st_rt)
3471 return errormsg
3472
3473 if fib and next_hop:
3474 if type(next_hop) is not list:
3475 next_hop = [next_hop]
3476
3477 for mnh in range(0, len(rib_routes_json[st_rt])):
3478 if not "selected" in rib_routes_json[st_rt][mnh]:
3479 continue
3480
3481 if (
3482 "fib"
3483 in rib_routes_json[st_rt][mnh]["nexthops"][0]
3484 ):
3485 found_hops.append(
3486 [
3487 rib_r["ip"]
3488 for rib_r in rib_routes_json[st_rt][
3489 mnh
3490 ]["nexthops"]
3491 ]
3492 )
3493
3494 if found_hops[0]:
3495 missing_list_of_nexthops = set(
3496 found_hops[0]
3497 ).difference(next_hop)
3498 additional_nexthops_in_required_nhs = set(
3499 next_hop
3500 ).difference(found_hops[0])
3501
3502 if additional_nexthops_in_required_nhs:
3503 logger.info(
3504 "Nexthop "
3505 "%s is not active for route %s in "
3506 "RIB of router %s\n",
3507 additional_nexthops_in_required_nhs,
3508 st_rt,
3509 dut,
3510 )
3511 errormsg = (
3512 "Nexthop {} is not active"
3513 " for route {} in RIB of router"
3514 " {}\n".format(
3515 additional_nexthops_in_required_nhs,
3516 st_rt,
3517 dut,
3518 )
3519 )
3520 return errormsg
3521 else:
3522 nh_found = True
3523
3524 elif next_hop and fib is None:
3525 if type(next_hop) is not list:
3526 next_hop = [next_hop]
3527 found_hops = [
3528 rib_r["ip"]
3529 for rib_r in rib_routes_json[st_rt][0]["nexthops"]
3530 if "ip" in rib_r
3531 ]
3532
3533 # If somehow key "ip" is not found in nexthops JSON
3534 # then found_hops would be 0, this particular
3535 # situation will be handled here
3536 if not len(found_hops):
3537 errormsg = (
3538 "Nexthop {} is Missing for "
3539 "route {} in RIB of router {}\n".format(
3540 next_hop,
3541 st_rt,
3542 dut,
3543 )
3544 )
3545 return errormsg
3546
3547 # Check only the count of nexthops
3548 if count_only:
3549 if len(next_hop) == len(found_hops):
3550 nh_found = True
3551 else:
3552 errormsg = (
3553 "Nexthops are missing for "
3554 "route {} in RIB of router {}: "
3555 "expected {}, found {}\n".format(
3556 st_rt,
3557 dut,
3558 len(next_hop),
3559 len(found_hops),
3560 )
3561 )
3562 return errormsg
3563
3564 # Check the actual nexthops
3565 elif found_hops:
3566 missing_list_of_nexthops = set(
3567 found_hops
3568 ).difference(next_hop)
3569 additional_nexthops_in_required_nhs = set(
3570 next_hop
3571 ).difference(found_hops)
3572
3573 if additional_nexthops_in_required_nhs:
3574 logger.info(
3575 "Missing nexthop %s for route"
3576 " %s in RIB of router %s\n",
3577 additional_nexthops_in_required_nhs,
3578 st_rt,
3579 dut,
3580 )
3581 errormsg = (
3582 "Nexthop {} is Missing for "
3583 "route {} in RIB of router {}\n".format(
3584 additional_nexthops_in_required_nhs,
3585 st_rt,
3586 dut,
3587 )
3588 )
3589 return errormsg
3590 else:
3591 nh_found = True
3592
3593 if tag:
3594 if "tag" not in rib_routes_json[st_rt][0]:
3595 errormsg = (
3596 "[DUT: {}]: tag is not"
3597 " present for"
3598 " route {} in RIB \n".format(dut, st_rt)
3599 )
3600 return errormsg
3601
3602 if _tag != rib_routes_json[st_rt][0]["tag"]:
3603 errormsg = (
3604 "[DUT: {}]: tag value {}"
3605 " is not matched for"
3606 " route {} in RIB \n".format(
3607 dut,
3608 _tag,
3609 st_rt,
3610 )
3611 )
3612 return errormsg
3613
3614 if metric is not None:
3615 if "metric" not in rib_routes_json[st_rt][0]:
3616 errormsg = (
3617 "[DUT: {}]: metric is"
3618 " not present for"
3619 " route {} in RIB \n".format(dut, st_rt)
3620 )
3621 return errormsg
3622
3623 if metric != rib_routes_json[st_rt][0]["metric"]:
3624 errormsg = (
3625 "[DUT: {}]: metric value "
3626 "{} is not matched for "
3627 "route {} in RIB \n".format(
3628 dut,
3629 metric,
3630 st_rt,
3631 )
3632 )
3633 return errormsg
3634
3635 else:
3636 missing_routes.append(st_rt)
3637
3638 if nh_found:
3639 logger.info(
3640 "[DUT: {}]: Found next_hop {} for"
3641 " RIB routes: {}".format(router, next_hop, found_routes)
3642 )
3643
3644 if len(missing_routes) > 0:
3645 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
3646 dut, missing_routes
3647 )
3648 return errormsg
3649
3650 if found_routes:
3651 logger.info(
3652 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
3653 dut,
3654 found_routes,
3655 )
3656
3657 continue
3658
3659 if "bgp" in input_dict[routerInput]:
3660 if (
3661 "advertise_networks"
3662 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
3663 "unicast"
3664 ]
3665 ):
3666 continue
3667
3668 found_routes = []
3669 missing_routes = []
3670 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
3671 addr_type
3672 ]["unicast"]["advertise_networks"]
3673
3674 # Continue if there are no network advertise
3675 if len(advertise_network) == 0:
3676 continue
3677
3678 for advertise_network_dict in advertise_network:
3679 if "vrf" in advertise_network_dict:
3680 cmd = "{} vrf {} json".format(
3681 command, advertise_network_dict["vrf"]
3682 )
3683 else:
3684 cmd = "{} json".format(command)
3685
3686 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3687
3688 # Verifying output dictionary rib_routes_json is not empty
3689 if bool(rib_routes_json) is False:
3690 errormsg = "No route found in rib of router {}..".format(router)
3691 return errormsg
3692
3693 start_ip = advertise_network_dict["network"]
3694 if "no_of_network" in advertise_network_dict:
3695 no_of_network = advertise_network_dict["no_of_network"]
3696 else:
3697 no_of_network = 1
3698
3699 # Generating IPs for verification
3700 ip_list = generate_ips(start_ip, no_of_network)
3701 st_found = False
3702 nh_found = False
3703
3704 for st_rt in ip_list:
3705 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
3706
3707 _addr_type = validate_ip_address(st_rt)
3708 if _addr_type != addr_type:
3709 continue
3710
3711 if st_rt in rib_routes_json:
3712 st_found = True
3713 found_routes.append(st_rt)
3714
3715 if "queued" in rib_routes_json[st_rt][0]:
3716 errormsg = "Route {} is queued\n".format(st_rt)
3717 return errormsg
3718
3719 if next_hop:
3720 if type(next_hop) is not list:
3721 next_hop = [next_hop]
3722
3723 count = 0
3724 for nh in next_hop:
3725 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
3726 if nh_dict["ip"] != nh:
3727 continue
3728 else:
3729 count += 1
3730
3731 if count == len(next_hop):
3732 nh_found = True
3733 else:
3734 errormsg = (
3735 "Nexthop {} is Missing"
3736 " for route {} in "
3737 "RIB of router {}\n".format(next_hop, st_rt, dut)
3738 )
3739 return errormsg
3740 else:
3741 missing_routes.append(st_rt)
3742
3743 if nh_found:
3744 logger.info(
3745 "Found next_hop {} for all routes in RIB"
3746 " of router {}\n".format(next_hop, dut)
3747 )
3748
3749 if len(missing_routes) > 0:
3750 errormsg = (
3751 "Missing {} route in RIB of router {}, "
3752 "routes: {} \n".format(addr_type, dut, missing_routes)
3753 )
3754 return errormsg
3755
3756 if found_routes:
3757 logger.info(
3758 "Verified {} routes in router {} RIB, found"
3759 " routes are: {}\n".format(addr_type, dut, found_routes)
3760 )
3761
3762 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3763 return True
3764
3765
3766 @retry(retry_timeout=12)
3767 def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None):
3768 """
3769 Data will be read from input_dict or input JSON file, API will generate
3770 same prefixes, which were redistributed by either create_static_routes() or
3771 advertise_networks_using_network_command() and will verify next_hop and
3772 each prefix/routes is present in "show ip/ipv6 fib json"
3773 command o/p.
3774
3775 Parameters
3776 ----------
3777 * `tgen` : topogen object
3778 * `addr_type` : ip type, ipv4/ipv6
3779 * `dut`: Device Under Test, for which user wants to test the data
3780 * `input_dict` : input dict, has details of static routes
3781 * `next_hop`[optional]: next_hop which needs to be verified,
3782 default: static
3783
3784 Usage
3785 -----
3786 input_routes_r1 = {
3787 "r1": {
3788 "static_routes": [{
3789 "network": ["1.1.1.1/32],
3790 "next_hop": "Null0",
3791 "vrf": "RED"
3792 }]
3793 }
3794 }
3795 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
3796
3797 Returns
3798 -------
3799 errormsg(str) or True
3800 """
3801
3802 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3803
3804 router_list = tgen.routers()
3805 if dut not in router_list:
3806 return
3807
3808 for routerInput in input_dict.keys():
3809 # XXX replace with router = dut; rnode = router_list[dut]
3810 for router, rnode in router_list.items():
3811 if router != dut:
3812 continue
3813
3814 logger.info("Checking router %s FIB routes:", router)
3815
3816 # Verifying RIB routes
3817 if addr_type == "ipv4":
3818 command = "show ip fib"
3819 else:
3820 command = "show ipv6 fib"
3821
3822 found_routes = []
3823 missing_routes = []
3824
3825 if "static_routes" in input_dict[routerInput]:
3826 static_routes = input_dict[routerInput]["static_routes"]
3827
3828 for static_route in static_routes:
3829 if "vrf" in static_route and static_route["vrf"] is not None:
3830
3831 logger.info(
3832 "[DUT: {}]: Verifying routes for VRF:"
3833 " {}".format(router, static_route["vrf"])
3834 )
3835
3836 cmd = "{} vrf {}".format(command, static_route["vrf"])
3837
3838 else:
3839 cmd = "{}".format(command)
3840
3841 cmd = "{} json".format(cmd)
3842
3843 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3844
3845 # Verifying output dictionary rib_routes_json is not empty
3846 if bool(rib_routes_json) is False:
3847 errormsg = "[DUT: {}]: No route found in fib".format(router)
3848 return errormsg
3849
3850 network = static_route["network"]
3851 if "no_of_ip" in static_route:
3852 no_of_ip = static_route["no_of_ip"]
3853 else:
3854 no_of_ip = 1
3855
3856 # Generating IPs for verification
3857 ip_list = generate_ips(network, no_of_ip)
3858 st_found = False
3859 nh_found = False
3860
3861 for st_rt in ip_list:
3862 st_rt = str(
3863 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
3864 )
3865 _addr_type = validate_ip_address(st_rt)
3866 if _addr_type != addr_type:
3867 continue
3868
3869 if st_rt in rib_routes_json:
3870 st_found = True
3871 found_routes.append(st_rt)
3872
3873 if next_hop:
3874 if type(next_hop) is not list:
3875 next_hop = [next_hop]
3876
3877 count = 0
3878 for nh in next_hop:
3879 for nh_dict in rib_routes_json[st_rt][0][
3880 "nexthops"
3881 ]:
3882 if nh_dict["ip"] != nh:
3883 continue
3884 else:
3885 count += 1
3886
3887 if count == len(next_hop):
3888 nh_found = True
3889 else:
3890 missing_routes.append(st_rt)
3891 errormsg = (
3892 "Nexthop {} is Missing"
3893 " for route {} in "
3894 "RIB of router {}\n".format(
3895 next_hop, st_rt, dut
3896 )
3897 )
3898 return errormsg
3899
3900 else:
3901 missing_routes.append(st_rt)
3902
3903 if len(missing_routes) > 0:
3904 errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
3905 dut, missing_routes
3906 )
3907 return errormsg
3908
3909 if nh_found:
3910 logger.info(
3911 "Found next_hop {} for all routes in RIB"
3912 " of router {}\n".format(next_hop, dut)
3913 )
3914
3915 if found_routes:
3916 logger.info(
3917 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
3918 dut,
3919 found_routes,
3920 )
3921
3922 continue
3923
3924 if "bgp" in input_dict[routerInput]:
3925 if (
3926 "advertise_networks"
3927 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
3928 "unicast"
3929 ]
3930 ):
3931 continue
3932
3933 found_routes = []
3934 missing_routes = []
3935 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
3936 addr_type
3937 ]["unicast"]["advertise_networks"]
3938
3939 # Continue if there are no network advertise
3940 if len(advertise_network) == 0:
3941 continue
3942
3943 for advertise_network_dict in advertise_network:
3944 if "vrf" in advertise_network_dict:
3945 cmd = "{} vrf {} json".format(command, static_route["vrf"])
3946 else:
3947 cmd = "{} json".format(command)
3948
3949 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3950
3951 # Verifying output dictionary rib_routes_json is not empty
3952 if bool(rib_routes_json) is False:
3953 errormsg = "No route found in rib of router {}..".format(router)
3954 return errormsg
3955
3956 start_ip = advertise_network_dict["network"]
3957 if "no_of_network" in advertise_network_dict:
3958 no_of_network = advertise_network_dict["no_of_network"]
3959 else:
3960 no_of_network = 1
3961
3962 # Generating IPs for verification
3963 ip_list = generate_ips(start_ip, no_of_network)
3964 st_found = False
3965 nh_found = False
3966
3967 for st_rt in ip_list:
3968 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
3969
3970 _addr_type = validate_ip_address(st_rt)
3971 if _addr_type != addr_type:
3972 continue
3973
3974 if st_rt in rib_routes_json:
3975 st_found = True
3976 found_routes.append(st_rt)
3977
3978 if next_hop:
3979 if type(next_hop) is not list:
3980 next_hop = [next_hop]
3981
3982 count = 0
3983 for nh in next_hop:
3984 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
3985 if nh_dict["ip"] != nh:
3986 continue
3987 else:
3988 count += 1
3989
3990 if count == len(next_hop):
3991 nh_found = True
3992 else:
3993 missing_routes.append(st_rt)
3994 errormsg = (
3995 "Nexthop {} is Missing"
3996 " for route {} in "
3997 "RIB of router {}\n".format(next_hop, st_rt, dut)
3998 )
3999 return errormsg
4000 else:
4001 missing_routes.append(st_rt)
4002
4003 if len(missing_routes) > 0:
4004 errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
4005 dut, missing_routes
4006 )
4007 return errormsg
4008
4009 if nh_found:
4010 logger.info(
4011 "Found next_hop {} for all routes in RIB"
4012 " of router {}\n".format(next_hop, dut)
4013 )
4014
4015 if found_routes:
4016 logger.info(
4017 "[DUT: {}]: Verified routes FIB"
4018 ", found routes are: {}\n".format(dut, found_routes)
4019 )
4020
4021 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4022 return True
4023
4024
4025 def verify_admin_distance_for_static_routes(tgen, input_dict):
4026 """
4027 API to verify admin distance for static routes as defined in input_dict/
4028 input JSON by running show ip/ipv6 route json command.
4029 Parameter
4030 ---------
4031 * `tgen` : topogen object
4032 * `input_dict`: having details like - for which router and static routes
4033 admin dsitance needs to be verified
4034 Usage
4035 -----
4036 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
4037 10.0.0.2 in router r1
4038 input_dict = {
4039 "r1": {
4040 "static_routes": [{
4041 "network": "10.0.20.1/32",
4042 "admin_distance": 10,
4043 "next_hop": "10.0.0.2"
4044 }]
4045 }
4046 }
4047 result = verify_admin_distance_for_static_routes(tgen, input_dict)
4048 Returns
4049 -------
4050 errormsg(str) or True
4051 """
4052
4053 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4054
4055 router_list = tgen.routers()
4056 for router in input_dict.keys():
4057 if router not in router_list:
4058 continue
4059 rnode = router_list[router]
4060
4061 for static_route in input_dict[router]["static_routes"]:
4062 addr_type = validate_ip_address(static_route["network"])
4063 # Command to execute
4064 if addr_type == "ipv4":
4065 command = "show ip route json"
4066 else:
4067 command = "show ipv6 route json"
4068 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
4069
4070 logger.info(
4071 "Verifying admin distance for static route %s" " under dut %s:",
4072 static_route,
4073 router,
4074 )
4075 network = static_route["network"]
4076 next_hop = static_route["next_hop"]
4077 admin_distance = static_route["admin_distance"]
4078 route_data = show_ip_route_json[network][0]
4079 if network in show_ip_route_json:
4080 if route_data["nexthops"][0]["ip"] == next_hop:
4081 if route_data["distance"] != admin_distance:
4082 errormsg = (
4083 "Verification failed: admin distance"
4084 " for static route {} under dut {},"
4085 " found:{} but expected:{}".format(
4086 static_route,
4087 router,
4088 route_data["distance"],
4089 admin_distance,
4090 )
4091 )
4092 return errormsg
4093 else:
4094 logger.info(
4095 "Verification successful: admin"
4096 " distance for static route %s under"
4097 " dut %s, found:%s",
4098 static_route,
4099 router,
4100 route_data["distance"],
4101 )
4102
4103 else:
4104 errormsg = (
4105 "Static route {} not found in "
4106 "show_ip_route_json for dut {}".format(network, router)
4107 )
4108 return errormsg
4109
4110 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4111 return True
4112
4113
4114 def verify_prefix_lists(tgen, input_dict):
4115 """
4116 Running "show ip prefix-list" command and verifying given prefix-list
4117 is present in router.
4118 Parameters
4119 ----------
4120 * `tgen` : topogen object
4121 * `input_dict`: data to verify prefix lists
4122 Usage
4123 -----
4124 # To verify pf_list_1 is present in router r1
4125 input_dict = {
4126 "r1": {
4127 "prefix_lists": ["pf_list_1"]
4128 }}
4129 result = verify_prefix_lists("ipv4", input_dict, tgen)
4130 Returns
4131 -------
4132 errormsg(str) or True
4133 """
4134
4135 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4136
4137 router_list = tgen.routers()
4138 for router in input_dict.keys():
4139 if router not in router_list:
4140 continue
4141
4142 rnode = router_list[router]
4143
4144 # Show ip prefix list
4145 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
4146
4147 # Verify Prefix list is deleted
4148 prefix_lists_addr = input_dict[router]["prefix_lists"]
4149 for addr_type in prefix_lists_addr:
4150 if not check_address_types(addr_type):
4151 continue
4152 # show ip prefix list
4153 if addr_type == "ipv4":
4154 cmd = "show ip prefix-list"
4155 else:
4156 cmd = "show {} prefix-list".format(addr_type)
4157 show_prefix_list = run_frr_cmd(rnode, cmd)
4158 for prefix_list in prefix_lists_addr[addr_type].keys():
4159 if prefix_list in show_prefix_list:
4160 errormsg = (
4161 "Prefix list {} is/are present in the router"
4162 " {}".format(prefix_list, router)
4163 )
4164 return errormsg
4165
4166 logger.info(
4167 "Prefix list %s is/are not present in the router" " from router %s",
4168 prefix_list,
4169 router,
4170 )
4171
4172 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4173 return True
4174
4175
4176 @retry(retry_timeout=12)
4177 def verify_route_maps(tgen, input_dict):
4178 """
4179 Running "show route-map" command and verifying given route-map
4180 is present in router.
4181 Parameters
4182 ----------
4183 * `tgen` : topogen object
4184 * `input_dict`: data to verify prefix lists
4185 Usage
4186 -----
4187 # To verify rmap_1 and rmap_2 are present in router r1
4188 input_dict = {
4189 "r1": {
4190 "route_maps": ["rmap_1", "rmap_2"]
4191 }
4192 }
4193 result = verify_route_maps(tgen, input_dict)
4194 Returns
4195 -------
4196 errormsg(str) or True
4197 """
4198
4199 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4200
4201 router_list = tgen.routers()
4202 for router in input_dict.keys():
4203 if router not in router_list:
4204 continue
4205
4206 rnode = router_list[router]
4207 # Show ip route-map
4208 show_route_maps = rnode.vtysh_cmd("show route-map")
4209
4210 # Verify route-map is deleted
4211 route_maps = input_dict[router]["route_maps"]
4212 for route_map in route_maps:
4213 if route_map in show_route_maps:
4214 errormsg = "Route map {} is not deleted from router" " {}".format(
4215 route_map, router
4216 )
4217 return errormsg
4218
4219 logger.info(
4220 "Route map %s is/are deleted successfully from" " router %s",
4221 route_maps,
4222 router,
4223 )
4224
4225 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4226 return True
4227
4228
4229 @retry(retry_timeout=16)
4230 def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
4231 """
4232 API to veiryf BGP large community is attached in route for any given
4233 DUT by running "show bgp ipv4/6 {route address} json" command.
4234 Parameters
4235 ----------
4236 * `tgen`: topogen object
4237 * `addr_type` : ip type, ipv4/ipv6
4238 * `dut`: Device Under Test
4239 * `network`: network for which set criteria needs to be verified
4240 * `input_dict`: having details like - for which router, community and
4241 values needs to be verified
4242 Usage
4243 -----
4244 networks = ["200.50.2.0/32"]
4245 input_dict = {
4246 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
4247 }
4248 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
4249 Returns
4250 -------
4251 errormsg(str) or True
4252 """
4253
4254 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4255 router_list = tgen.routers()
4256 if router not in router_list:
4257 return False
4258
4259 rnode = router_list[router]
4260
4261 logger.debug(
4262 "Verifying BGP community attributes on dut %s: for %s " "network %s",
4263 router,
4264 addr_type,
4265 network,
4266 )
4267
4268 for net in network:
4269 cmd = "show bgp {} {} json".format(addr_type, net)
4270 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
4271 logger.info(show_bgp_json)
4272 if "paths" not in show_bgp_json:
4273 return "Prefix {} not found in BGP table of router: {}".format(net, router)
4274
4275 as_paths = show_bgp_json["paths"]
4276 found = False
4277 for i in range(len(as_paths)):
4278 if (
4279 "largeCommunity" in show_bgp_json["paths"][i]
4280 or "community" in show_bgp_json["paths"][i]
4281 ):
4282 found = True
4283 logger.info(
4284 "Large Community attribute is found for route:" " %s in router: %s",
4285 net,
4286 router,
4287 )
4288 if input_dict is not None:
4289 for criteria, comm_val in input_dict.items():
4290 show_val = show_bgp_json["paths"][i][criteria]["string"]
4291 if comm_val == show_val:
4292 logger.info(
4293 "Verifying BGP %s for prefix: %s"
4294 " in router: %s, found expected"
4295 " value: %s",
4296 criteria,
4297 net,
4298 router,
4299 comm_val,
4300 )
4301 else:
4302 errormsg = (
4303 "Failed: Verifying BGP attribute"
4304 " {} for route: {} in router: {}"
4305 ", expected value: {} but found"
4306 ": {}".format(criteria, net, router, comm_val, show_val)
4307 )
4308 return errormsg
4309
4310 if not found:
4311 errormsg = (
4312 "Large Community attribute is not found for route: "
4313 "{} in router: {} ".format(net, router)
4314 )
4315 return errormsg
4316
4317 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4318 return True
4319
4320
4321 def get_ipv6_linklocal_address(topo, node, intf):
4322 """
4323 API to get the link local ipv6 address of a particular interface
4324
4325 Parameters
4326 ----------
4327 * `node`: node on which link local ip to be fetched.
4328 * `intf` : interface for which link local ip needs to be returned.
4329 * `topo` : base topo
4330
4331 Usage
4332 -----
4333 result = get_ipv6_linklocal_address(topo, 'r1', 'r2')
4334
4335 Returns link local ip of interface between r1 and r2.
4336
4337 Returns
4338 -------
4339 1) link local ipv6 address from the interface
4340 2) errormsg - when link local ip not found
4341 """
4342 tgen = get_topogen()
4343 ext_nh = tgen.net[node].get_ipv6_linklocal()
4344 req_nh = topo[node]["links"][intf]["interface"]
4345 llip = None
4346 for llips in ext_nh:
4347 if llips[0] == req_nh:
4348 llip = llips[1]
4349 logger.info("Link local ip found = %s", llip)
4350 return llip
4351
4352 errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format(
4353 node, intf
4354 )
4355
4356 return errormsg
4357
4358
4359 def verify_create_community_list(tgen, input_dict):
4360 """
4361 API is to verify if large community list is created for any given DUT in
4362 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
4363 command.
4364 Parameters
4365 ----------
4366 * `tgen`: topogen object
4367 * `input_dict`: having details like - for which router, large community
4368 needs to be verified
4369 Usage
4370 -----
4371 input_dict = {
4372 "r1": {
4373 "large-community-list": {
4374 "standard": {
4375 "Test1": [{"action": "PERMIT", "attribute":\
4376 ""}]
4377 }}}}
4378 result = verify_create_community_list(tgen, input_dict)
4379 Returns
4380 -------
4381 errormsg(str) or True
4382 """
4383
4384 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4385
4386 router_list = tgen.routers()
4387 for router in input_dict.keys():
4388 if router not in router_list:
4389 continue
4390
4391 rnode = router_list[router]
4392
4393 logger.info("Verifying large-community is created for dut %s:", router)
4394
4395 for comm_data in input_dict[router]["bgp_community_lists"]:
4396 comm_name = comm_data["name"]
4397 comm_type = comm_data["community_type"]
4398 show_bgp_community = run_frr_cmd(
4399 rnode, "show bgp large-community-list {} detail".format(comm_name)
4400 )
4401
4402 # Verify community list and type
4403 if comm_name in show_bgp_community and comm_type in show_bgp_community:
4404 logger.info(
4405 "BGP %s large-community-list %s is" " created", comm_type, comm_name
4406 )
4407 else:
4408 errormsg = "BGP {} large-community-list {} is not" " created".format(
4409 comm_type, comm_name
4410 )
4411 return errormsg
4412
4413 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4414 return True
4415
4416
4417 def verify_cli_json(tgen, input_dict):
4418 """
4419 API to verify if JSON is available for clis
4420 command.
4421 Parameters
4422 ----------
4423 * `tgen`: topogen object
4424 * `input_dict`: CLIs for which JSON needs to be verified
4425 Usage
4426 -----
4427 input_dict = {
4428 "edge1":{
4429 "cli": ["show evpn vni detail", show evpn rmac vni all]
4430 }
4431 }
4432
4433 result = verify_cli_json(tgen, input_dict)
4434
4435 Returns
4436 -------
4437 errormsg(str) or True
4438 """
4439
4440 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4441 for dut in input_dict.keys():
4442 rnode = tgen.gears[dut]
4443
4444 for cli in input_dict[dut]["cli"]:
4445 logger.info(
4446 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli
4447 )
4448
4449 test_cli = "{} json".format(cli)
4450 ret_json = rnode.vtysh_cmd(test_cli, isjson=True)
4451 if not bool(ret_json):
4452 errormsg = "CLI: %s, JSON format is not available" % (cli)
4453 return errormsg
4454 elif "unknown" in ret_json or "Unknown" in ret_json:
4455 errormsg = "CLI: %s, JSON format is not available" % (cli)
4456 return errormsg
4457 else:
4458 logger.info(
4459 "CLI : %s JSON format is available: " "\n %s", cli, ret_json
4460 )
4461
4462 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4463
4464 return True
4465
4466
4467 @retry(retry_timeout=12)
4468 def verify_evpn_vni(tgen, input_dict):
4469 """
4470 API to verify evpn vni details using "show evpn vni detail json"
4471 command.
4472
4473 Parameters
4474 ----------
4475 * `tgen`: topogen object
4476 * `input_dict`: having details like - for which router, evpn details
4477 needs to be verified
4478 Usage
4479 -----
4480 input_dict = {
4481 "edge1":{
4482 "vni": [
4483 {
4484 "75100":{
4485 "vrf": "RED",
4486 "vxlanIntf": "vxlan75100",
4487 "localVtepIp": "120.1.1.1",
4488 "sviIntf": "br100"
4489 }
4490 }
4491 ]
4492 }
4493 }
4494
4495 result = verify_evpn_vni(tgen, input_dict)
4496
4497 Returns
4498 -------
4499 errormsg(str) or True
4500 """
4501
4502 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4503 for dut in input_dict.keys():
4504 rnode = tgen.gears[dut]
4505
4506 logger.info("[DUT: %s]: Verifying evpn vni details :", dut)
4507
4508 cmd = "show evpn vni detail json"
4509 evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4510 if not bool(evpn_all_vni_json):
4511 errormsg = "No output for '{}' cli".format(cmd)
4512 return errormsg
4513
4514 if "vni" in input_dict[dut]:
4515 for vni_dict in input_dict[dut]["vni"]:
4516 found = False
4517 vni = vni_dict["name"]
4518 for evpn_vni_json in evpn_all_vni_json:
4519 if "vni" in evpn_vni_json:
4520 if evpn_vni_json["vni"] != int(vni):
4521 continue
4522
4523 for attribute in vni_dict.keys():
4524 if vni_dict[attribute] != evpn_vni_json[attribute]:
4525 errormsg = (
4526 "[DUT: %s] Verifying "
4527 "%s for VNI: %s [FAILED]||"
4528 ", EXPECTED : %s "
4529 " FOUND : %s"
4530 % (
4531 dut,
4532 attribute,
4533 vni,
4534 vni_dict[attribute],
4535 evpn_vni_json[attribute],
4536 )
4537 )
4538 return errormsg
4539
4540 else:
4541 found = True
4542 logger.info(
4543 "[DUT: %s] Verifying"
4544 " %s for VNI: %s , "
4545 "Found Expected : %s ",
4546 dut,
4547 attribute,
4548 vni,
4549 evpn_vni_json[attribute],
4550 )
4551
4552 if evpn_vni_json["state"] != "Up":
4553 errormsg = (
4554 "[DUT: %s] Failed: Verifying"
4555 " State for VNI: %s is not Up" % (dut, vni)
4556 )
4557 return errormsg
4558
4559 else:
4560 errormsg = (
4561 "[DUT: %s] Failed:"
4562 " VNI: %s is not present in JSON" % (dut, vni)
4563 )
4564 return errormsg
4565
4566 if found:
4567 logger.info(
4568 "[DUT %s]: Verifying VNI : %s "
4569 "details and state is Up [PASSED]!!",
4570 dut,
4571 vni,
4572 )
4573 return True
4574
4575 else:
4576 errormsg = (
4577 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut)
4578 )
4579 return errormsg
4580
4581 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4582 return False
4583
4584
4585 @retry(retry_timeout=12)
4586 def verify_vrf_vni(tgen, input_dict):
4587 """
4588 API to verify vrf vni details using "show vrf vni json"
4589 command.
4590 Parameters
4591 ----------
4592 * `tgen`: topogen object
4593 * `input_dict`: having details like - for which router, evpn details
4594 needs to be verified
4595 Usage
4596 -----
4597 input_dict = {
4598 "edge1":{
4599 "vrfs": [
4600 {
4601 "RED":{
4602 "vni": 75000,
4603 "vxlanIntf": "vxlan75100",
4604 "sviIntf": "br100",
4605 "routerMac": "00:80:48:ba:d1:00",
4606 "state": "Up"
4607 }
4608 }
4609 ]
4610 }
4611 }
4612
4613 result = verify_vrf_vni(tgen, input_dict)
4614
4615 Returns
4616 -------
4617 errormsg(str) or True
4618 """
4619
4620 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4621 for dut in input_dict.keys():
4622 rnode = tgen.gears[dut]
4623
4624 logger.info("[DUT: %s]: Verifying vrf vni details :", dut)
4625
4626 cmd = "show vrf vni json"
4627 vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4628 if not bool(vrf_all_vni_json):
4629 errormsg = "No output for '{}' cli".format(cmd)
4630 return errormsg
4631
4632 if "vrfs" in input_dict[dut]:
4633 for vrfs in input_dict[dut]["vrfs"]:
4634 for vrf, vrf_dict in vrfs.items():
4635 found = False
4636 for vrf_vni_json in vrf_all_vni_json["vrfs"]:
4637 if "vrf" in vrf_vni_json:
4638 if vrf_vni_json["vrf"] != vrf:
4639 continue
4640
4641 for attribute in vrf_dict.keys():
4642 if vrf_dict[attribute] == vrf_vni_json[attribute]:
4643 found = True
4644 logger.info(
4645 "[DUT %s]: VRF: %s, "
4646 "verifying %s "
4647 ", Found Expected: %s "
4648 "[PASSED]!!",
4649 dut,
4650 vrf,
4651 attribute,
4652 vrf_vni_json[attribute],
4653 )
4654 else:
4655 errormsg = (
4656 "[DUT: %s] VRF: %s, "
4657 "verifying %s [FAILED!!] "
4658 ", EXPECTED : %s "
4659 ", FOUND : %s"
4660 % (
4661 dut,
4662 vrf,
4663 attribute,
4664 vrf_dict[attribute],
4665 vrf_vni_json[attribute],
4666 )
4667 )
4668 return errormsg
4669
4670 else:
4671 errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % (
4672 dut,
4673 vrf,
4674 )
4675 return errormsg
4676
4677 if found:
4678 logger.info(
4679 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
4680 dut,
4681 vrf,
4682 )
4683 return True
4684
4685 else:
4686 errormsg = (
4687 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut)
4688 )
4689 return errormsg
4690
4691 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4692 return False
4693
4694
4695 def required_linux_kernel_version(required_version):
4696 """
4697 This API is used to check linux version compatibility of the test suite.
4698 If version mentioned in required_version is higher than the linux kernel
4699 of the system, test suite will be skipped. This API returns true or errormsg.
4700
4701 Parameters
4702 ----------
4703 * `required_version` : Kernel version required for the suites to run.
4704
4705 Usage
4706 -----
4707 result = linux_kernel_version_lowerthan('4.15')
4708
4709 Returns
4710 -------
4711 errormsg(str) or True
4712 """
4713 system_kernel = platform.release()
4714 if version_cmp(system_kernel, required_version) < 0:
4715 error_msg = (
4716 'These tests will not run on kernel "{}", '
4717 "they require kernel >= {})".format(system_kernel, required_version)
4718 )
4719
4720 logger.info(error_msg)
4721
4722 return error_msg
4723 return True
4724
4725
4726 class HostApplicationHelper(object):
4727 """Helper to track and cleanup per-host based test processes."""
4728
4729 def __init__(self, tgen=None, base_cmd=None):
4730 self.base_cmd_str = ""
4731 self.host_procs = {}
4732 self.tgen = None
4733 self.set_base_cmd(base_cmd if base_cmd else [])
4734 if tgen is not None:
4735 self.init(tgen)
4736
4737 def __enter__(self):
4738 self.init()
4739 return self
4740
4741 def __exit__(self, type, value, traceback):
4742 self.cleanup()
4743
4744 def __str__(self):
4745 return "HostApplicationHelper({})".format(self.base_cmd_str)
4746
4747 def set_base_cmd(self, base_cmd):
4748 assert isinstance(base_cmd, list) or isinstance(base_cmd, tuple)
4749 self.base_cmd = base_cmd
4750 if base_cmd:
4751 self.base_cmd_str = " ".join(base_cmd)
4752 else:
4753 self.base_cmd_str = ""
4754
4755 def init(self, tgen=None):
4756 """Initialize the helper with tgen if needed.
4757
4758 If overridden, need to handle multiple entries but one init. Will be called on
4759 object creation if tgen is supplied. Will be called again on __enter__ so should
4760 not re-init if already inited.
4761 """
4762 if self.tgen:
4763 assert tgen is None or self.tgen == tgen
4764 else:
4765 self.tgen = tgen
4766
4767 def started_proc(self, host, p):
4768 """Called after process started on host.
4769
4770 Return value is passed to `stopping_proc` method."""
4771 logger.debug("%s: Doing nothing after starting process", self)
4772 return False
4773
4774 def stopping_proc(self, host, p, info):
4775 """Called after process started on host."""
4776 logger.debug("%s: Doing nothing before stopping process", self)
4777
4778 def _add_host_proc(self, host, p):
4779 v = self.started_proc(host, p)
4780
4781 if host not in self.host_procs:
4782 self.host_procs[host] = []
4783 logger.debug("%s: %s: tracking process %s", self, host, p)
4784 self.host_procs[host].append((p, v))
4785
4786 def stop_host(self, host):
4787 """Stop the process on the host.
4788
4789 Override to do additional cleanup."""
4790 if host in self.host_procs:
4791 hlogger = self.tgen.net[host].logger
4792 for p, v in self.host_procs[host]:
4793 self.stopping_proc(host, p, v)
4794 logger.debug("%s: %s: terminating process %s", self, host, p.pid)
4795 hlogger.debug("%s: %s: terminating process %s", self, host, p.pid)
4796 rc = p.poll()
4797 if rc is not None:
4798 logger.error(
4799 "%s: %s: process early exit %s: %s",
4800 self,
4801 host,
4802 p.pid,
4803 comm_error(p),
4804 )
4805 hlogger.error(
4806 "%s: %s: process early exit %s: %s",
4807 self,
4808 host,
4809 p.pid,
4810 comm_error(p),
4811 )
4812 else:
4813 p.terminate()
4814 p.wait()
4815 logger.debug(
4816 "%s: %s: terminated process %s: %s",
4817 self,
4818 host,
4819 p.pid,
4820 comm_error(p),
4821 )
4822 hlogger.debug(
4823 "%s: %s: terminated process %s: %s",
4824 self,
4825 host,
4826 p.pid,
4827 comm_error(p),
4828 )
4829
4830 del self.host_procs[host]
4831
4832 def stop_all_hosts(self):
4833 hosts = set(self.host_procs)
4834 for host in hosts:
4835 self.stop_host(host)
4836
4837 def cleanup(self):
4838 self.stop_all_hosts()
4839
4840 def run(self, host, cmd_args, **kwargs):
4841 cmd = list(self.base_cmd)
4842 cmd.extend(cmd_args)
4843 p = self.tgen.gears[host].popen(cmd, **kwargs)
4844 assert p.poll() is None
4845 self._add_host_proc(host, p)
4846 return p
4847
4848 def check_procs(self):
4849 """Check that all current processes are running, log errors if not.
4850
4851 Returns: List of stopped processes."""
4852 procs = []
4853
4854 logger.debug("%s: checking procs on hosts %s", self, self.host_procs.keys())
4855
4856 for host in self.host_procs:
4857 hlogger = self.tgen.net[host].logger
4858 for p, _ in self.host_procs[host]:
4859 logger.debug("%s: checking %s proc %s", self, host, p)
4860 rc = p.poll()
4861 if rc is None:
4862 continue
4863 logger.error(
4864 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
4865 )
4866 hlogger.error(
4867 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
4868 )
4869 procs.append(p)
4870 return procs
4871
4872
4873 class IPerfHelper(HostApplicationHelper):
4874 def __str__(self):
4875 return "IPerfHelper()"
4876
4877 def run_join(
4878 self,
4879 host,
4880 join_addr,
4881 l4Type="UDP",
4882 join_interval=1,
4883 join_intf=None,
4884 join_towards=None,
4885 ):
4886 """
4887 Use iperf to send IGMP join and listen to traffic
4888
4889 Parameters:
4890 -----------
4891 * `host`: iperf host from where IGMP join would be sent
4892 * `l4Type`: string, one of [ TCP, UDP ]
4893 * `join_addr`: multicast address (or addresses) to join to
4894 * `join_interval`: seconds between periodic bandwidth reports
4895 * `join_intf`: the interface to bind the join to
4896 * `join_towards`: router whos interface to bind the join to
4897
4898 returns: Success (bool)
4899 """
4900
4901 iperf_path = self.tgen.net.get_exec_path("iperf")
4902
4903 assert join_addr
4904 if not isinstance(join_addr, list) and not isinstance(join_addr, tuple):
4905 join_addr = [ipaddress.IPv4Address(frr_unicode(join_addr))]
4906
4907 for bindTo in join_addr:
4908 iperf_args = [iperf_path, "-s"]
4909
4910 if l4Type == "UDP":
4911 iperf_args.append("-u")
4912
4913 iperf_args.append("-B")
4914 if join_towards:
4915 to_intf = frr_unicode(
4916 self.tgen.json_topo["routers"][host]["links"][join_towards][
4917 "interface"
4918 ]
4919 )
4920 iperf_args.append("{}%{}".format(str(bindTo), to_intf))
4921 elif join_intf:
4922 iperf_args.append("{}%{}".format(str(bindTo), join_intf))
4923 else:
4924 iperf_args.append(str(bindTo))
4925
4926 if join_interval:
4927 iperf_args.append("-i")
4928 iperf_args.append(str(join_interval))
4929
4930 p = self.run(host, iperf_args)
4931 if p.poll() is not None:
4932 logger.error("IGMP join failed on %s: %s", bindTo, comm_error(p))
4933 return False
4934 return True
4935
4936 def run_traffic(
4937 self, host, sentToAddress, ttl, time=0, l4Type="UDP", bind_towards=None
4938 ):
4939 """
4940 Run iperf to send IGMP join and traffic
4941
4942 Parameters:
4943 -----------
4944 * `host`: iperf host to send traffic from
4945 * `l4Type`: string, one of [ TCP, UDP ]
4946 * `sentToAddress`: multicast address to send traffic to
4947 * `ttl`: time to live
4948 * `time`: time in seconds to transmit for
4949 * `bind_towards`: Router who's interface the source ip address is got from
4950
4951 returns: Success (bool)
4952 """
4953
4954 iperf_path = self.tgen.net.get_exec_path("iperf")
4955
4956 if sentToAddress and not isinstance(sentToAddress, list):
4957 sentToAddress = [ipaddress.IPv4Address(frr_unicode(sentToAddress))]
4958
4959 for sendTo in sentToAddress:
4960 iperf_args = [iperf_path, "-c", sendTo]
4961
4962 # Bind to Interface IP
4963 if bind_towards:
4964 ifaddr = frr_unicode(
4965 self.tgen.json_topo["routers"][host]["links"][bind_towards]["ipv4"]
4966 )
4967 ipaddr = ipaddress.IPv4Interface(ifaddr).ip
4968 iperf_args.append("-B")
4969 iperf_args.append(str(ipaddr))
4970
4971 # UDP/TCP
4972 if l4Type == "UDP":
4973 iperf_args.append("-u")
4974 iperf_args.append("-b")
4975 iperf_args.append("0.012m")
4976
4977 # TTL
4978 if ttl:
4979 iperf_args.append("-T")
4980 iperf_args.append(str(ttl))
4981
4982 # Time
4983 if time:
4984 iperf_args.append("-t")
4985 iperf_args.append(str(time))
4986
4987 p = self.run(host, iperf_args)
4988 if p.poll() is not None:
4989 logger.error(
4990 "mcast traffic send failed for %s: %s", sendTo, comm_error(p)
4991 )
4992 return False
4993
4994 return True
4995
4996
4997 def verify_ip_nht(tgen, input_dict):
4998 """
4999 Running "show ip nht" command and verifying given nexthop resolution
5000 Parameters
5001 ----------
5002 * `tgen` : topogen object
5003 * `input_dict`: data to verify nexthop
5004 Usage
5005 -----
5006 input_dict_4 = {
5007 "r1": {
5008 nh: {
5009 "Address": nh,
5010 "resolvedVia": "connected",
5011 "nexthops": {
5012 "nexthop1": {
5013 "Interface": intf
5014 }
5015 }
5016 }
5017 }
5018 }
5019 result = verify_ip_nht(tgen, input_dict_4)
5020 Returns
5021 -------
5022 errormsg(str) or True
5023 """
5024
5025 logger.debug("Entering lib API: verify_ip_nht()")
5026
5027 router_list = tgen.routers()
5028 for router in input_dict.keys():
5029 if router not in router_list:
5030 continue
5031
5032 rnode = router_list[router]
5033 nh_list = input_dict[router]
5034
5035 if validate_ip_address(next(iter(nh_list))) == "ipv6":
5036 show_ip_nht = run_frr_cmd(rnode, "show ipv6 nht")
5037 else:
5038 show_ip_nht = run_frr_cmd(rnode, "show ip nht")
5039
5040 for nh in nh_list:
5041 if nh in show_ip_nht:
5042 nht = run_frr_cmd(rnode, f"show ip nht {nh}")
5043 if "unresolved" in nht:
5044 errormsg = "Nexthop {} became unresolved on {}".format(nh, router)
5045 return errormsg
5046 else:
5047 logger.info("Nexthop %s is resolved on %s", nh, router)
5048 return True
5049 else:
5050 errormsg = "Nexthop {} is resolved on {}".format(nh, router)
5051 return errormsg
5052
5053 logger.debug("Exiting lib API: verify_ip_nht()")
5054 return False
5055
5056
5057 def scapy_send_raw_packet(tgen, topo, senderRouter, intf, packet=None):
5058 """
5059 Using scapy Raw() method to send BSR raw packet from one FRR
5060 to other
5061
5062 Parameters:
5063 -----------
5064 * `tgen` : Topogen object
5065 * `topo` : json file data
5066 * `senderRouter` : Sender router
5067 * `packet` : packet in raw format
5068
5069 returns:
5070 --------
5071 errormsg or True
5072 """
5073
5074 global CD
5075 result = ""
5076 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5077 sender_interface = intf
5078 rnode = tgen.routers()[senderRouter]
5079
5080 for destLink, data in topo["routers"][senderRouter]["links"].items():
5081 if "type" in data and data["type"] == "loopback":
5082 continue
5083
5084 if not packet:
5085 packet = topo["routers"][senderRouter]["pkt"]["test_packets"][packet][
5086 "data"
5087 ]
5088
5089 python3_path = tgen.net.get_exec_path(["python3", "python"])
5090 script_path = os.path.join(CD, "send_bsr_packet.py")
5091 cmd = "{} {} '{}' '{}' --interval=1 --count=1".format(
5092 python3_path, script_path, packet, sender_interface
5093 )
5094
5095 logger.info("Scapy cmd: \n %s", cmd)
5096 result = rnode.run(cmd)
5097
5098 if result == "":
5099 return result
5100
5101 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
5102 return True