]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
Merge pull request #12260 from opensourcerouting/pim-plist-masklen
[mirror_frr.git] / tests / topotests / lib / common_config.py
1 #
2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
5 #
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
9 # in all copies.
10 #
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
18 # OF THIS SOFTWARE.
19 #
20
21 import ipaddress
22 import json
23 import os
24 import platform
25 import socket
26 import subprocess
27 import sys
28 import traceback
29 import functools
30 from collections import OrderedDict
31 from copy import deepcopy
32 from datetime import datetime, timedelta
33 from functools import wraps
34 from re import search as re_search
35 from time import sleep
36
37 try:
38 # Imports from python2
39 import ConfigParser as configparser
40 except ImportError:
41 # Imports from python3
42 import configparser
43
44 from lib.micronet import comm_error
45 from lib.topogen import TopoRouter, get_topogen
46 from lib.topolog import get_logger, logger
47 from lib.topotest import frr_unicode, interface_set_status, version_cmp
48 from lib import topotest
49
50 FRRCFG_FILE = "frr_json.conf"
51 FRRCFG_BKUP_FILE = "frr_json_initial.conf"
52
53 ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
54
55 ####
56 CD = os.path.dirname(os.path.realpath(__file__))
57 PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
58
59 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
60 # in `pytest.ini`.
61 config = configparser.ConfigParser()
62 config.read(PYTESTINI_PATH)
63
64 config_section = "topogen"
65
66 # Debug logs for daemons
67 DEBUG_LOGS = {
68 "pimd": [
69 "debug msdp events",
70 "debug msdp packets",
71 "debug igmp events",
72 "debug igmp trace",
73 "debug mroute",
74 "debug mroute detail",
75 "debug pim events",
76 "debug pim packets",
77 "debug pim trace",
78 "debug pim zebra",
79 "debug pim bsm",
80 "debug pim packets joins",
81 "debug pim packets register",
82 "debug pim nht",
83 ],
84 "bgpd": [
85 "debug bgp neighbor-events",
86 "debug bgp updates",
87 "debug bgp zebra",
88 "debug bgp nht",
89 "debug bgp neighbor-events",
90 "debug bgp graceful-restart",
91 "debug bgp update-groups",
92 "debug bgp vpn leak-from-vrf",
93 "debug bgp vpn leak-to-vrf",
94 "debug bgp zebr",
95 "debug bgp updates",
96 "debug bgp nht",
97 "debug bgp neighbor-events",
98 "debug vrf",
99 ],
100 "zebra": [
101 "debug zebra events",
102 "debug zebra rib",
103 "debug zebra vxlan",
104 "debug zebra nht",
105 ],
106 "ospf": [
107 "debug ospf event",
108 "debug ospf ism",
109 "debug ospf lsa",
110 "debug ospf nsm",
111 "debug ospf nssa",
112 "debug ospf packet all",
113 "debug ospf sr",
114 "debug ospf te",
115 "debug ospf zebra",
116 ],
117 "ospf6": [
118 "debug ospf6 event",
119 "debug ospf6 ism",
120 "debug ospf6 lsa",
121 "debug ospf6 nsm",
122 "debug ospf6 nssa",
123 "debug ospf6 packet all",
124 "debug ospf6 sr",
125 "debug ospf6 te",
126 "debug ospf6 zebra",
127 ],
128 }
129
130 g_iperf_client_procs = {}
131 g_iperf_server_procs = {}
132
133
134 def is_string(value):
135 try:
136 return isinstance(value, basestring)
137 except NameError:
138 return isinstance(value, str)
139
140
141 if config.has_option("topogen", "verbosity"):
142 loglevel = config.get("topogen", "verbosity")
143 loglevel = loglevel.lower()
144 else:
145 loglevel = "info"
146
147 if config.has_option("topogen", "frrtest_log_dir"):
148 frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
149 time_stamp = datetime.time(datetime.now())
150 logfile_name = "frr_test_bgp_"
151 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
152 print("frrtest_log_file..", frrtest_log_file)
153
154 logger = get_logger(
155 "test_execution_logs", log_level=loglevel, target=frrtest_log_file
156 )
157 print("Logs will be sent to logfile: {}".format(frrtest_log_file))
158
159 if config.has_option("topogen", "show_router_config"):
160 show_router_config = config.get("topogen", "show_router_config")
161 else:
162 show_router_config = False
163
164 # env variable for setting what address type to test
165 ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
166
167
168 # Saves sequence id numbers
169 SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
170
171
172 def get_seq_id(obj_type, router, obj_name):
173 """
174 Generates and saves sequence number in interval of 10
175 Parameters
176 ----------
177 * `obj_type`: prefix_lists or route_maps
178 * `router`: router name
179 *` obj_name`: name of the prefix-list or route-map
180 Returns
181 --------
182 Sequence number generated
183 """
184
185 router_data = SEQ_ID[obj_type].setdefault(router, {})
186 obj_data = router_data.setdefault(obj_name, {})
187 seq_id = obj_data.setdefault("seq_id", 0)
188
189 seq_id = int(seq_id) + 10
190 obj_data["seq_id"] = seq_id
191
192 return seq_id
193
194
195 def set_seq_id(obj_type, router, id, obj_name):
196 """
197 Saves sequence number if not auto-generated and given by user
198 Parameters
199 ----------
200 * `obj_type`: prefix_lists or route_maps
201 * `router`: router name
202 *` obj_name`: name of the prefix-list or route-map
203 """
204 router_data = SEQ_ID[obj_type].setdefault(router, {})
205 obj_data = router_data.setdefault(obj_name, {})
206 seq_id = obj_data.setdefault("seq_id", 0)
207
208 seq_id = int(seq_id) + int(id)
209 obj_data["seq_id"] = seq_id
210
211
212 class InvalidCLIError(Exception):
213 """Raise when the CLI command is wrong"""
214
215
216 def run_frr_cmd(rnode, cmd, isjson=False):
217 """
218 Execute frr show commands in privileged mode
219 * `rnode`: router node on which command needs to be executed
220 * `cmd`: Command to be executed on frr
221 * `isjson`: If command is to get json data or not
222 :return str:
223 """
224
225 if cmd:
226 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
227
228 if isjson:
229 rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
230
231 return ret_data
232
233 else:
234 raise InvalidCLIError("No actual cmd passed")
235
236
237 def apply_raw_config(tgen, input_dict):
238
239 """
240 API to configure raw configuration on device. This can be used for any cli
241 which has not been implemented in JSON.
242
243 Parameters
244 ----------
245 * `tgen`: tgen object
246 * `input_dict`: configuration that needs to be applied
247
248 Usage
249 -----
250 input_dict = {
251 "r2": {
252 "raw_config": [
253 "router bgp",
254 "no bgp update-group-split-horizon"
255 ]
256 }
257 }
258 Returns
259 -------
260 True or errormsg
261 """
262
263 rlist = []
264
265 for router_name in input_dict.keys():
266 config_cmd = input_dict[router_name]["raw_config"]
267
268 if not isinstance(config_cmd, list):
269 config_cmd = [config_cmd]
270
271 frr_cfg_file = "{}/{}/{}".format(tgen.logdir, router_name, FRRCFG_FILE)
272 with open(frr_cfg_file, "w") as cfg:
273 for cmd in config_cmd:
274 cfg.write("{}\n".format(cmd))
275
276 rlist.append(router_name)
277
278 # Load config on all routers
279 return load_config_to_routers(tgen, rlist)
280
281
282 def create_common_configurations(
283 tgen, config_dict, config_type=None, build=False, load_config=True
284 ):
285 """
286 API to create object of class FRRConfig and also create frr_json.conf
287 file. It will create interface and common configurations and save it to
288 frr_json.conf and load to router
289 Parameters
290 ----------
291 * `tgen`: tgen object
292 * `config_dict`: Configuration data saved in a dict of { router: config-list }
293 * `routers` : list of router id to be configured.
294 * `config_type` : Syntactic information while writing configuration. Should
295 be one of the value as mentioned in the config_map below.
296 * `build` : Only for initial setup phase this is set as True
297 Returns
298 -------
299 True or False
300 """
301
302 config_map = OrderedDict(
303 {
304 "general_config": "! FRR General Config\n",
305 "debug_log_config": "! Debug log Config\n",
306 "interface_config": "! Interfaces Config\n",
307 "static_route": "! Static Route Config\n",
308 "prefix_list": "! Prefix List Config\n",
309 "bgp_community_list": "! Community List Config\n",
310 "route_maps": "! Route Maps Config\n",
311 "bgp": "! BGP Config\n",
312 "vrf": "! VRF Config\n",
313 "ospf": "! OSPF Config\n",
314 "ospf6": "! OSPF Config\n",
315 "pim": "! PIM Config\n",
316 }
317 )
318
319 if build:
320 mode = "a"
321 elif not load_config:
322 mode = "a"
323 else:
324 mode = "w"
325
326 routers = config_dict.keys()
327 for router in routers:
328 fname = "{}/{}/{}".format(tgen.logdir, router, FRRCFG_FILE)
329 try:
330 frr_cfg_fd = open(fname, mode)
331 if config_type:
332 frr_cfg_fd.write(config_map[config_type])
333 for line in config_dict[router]:
334 frr_cfg_fd.write("{} \n".format(str(line)))
335 frr_cfg_fd.write("\n")
336
337 except IOError as err:
338 logger.error("Unable to open FRR Config '%s': %s" % (fname, str(err)))
339 return False
340 finally:
341 frr_cfg_fd.close()
342
343 # If configuration applied from build, it will done at last
344 result = True
345 if not build and load_config:
346 result = load_config_to_routers(tgen, routers)
347
348 return result
349
350
351 def create_common_configuration(
352 tgen, router, data, config_type=None, build=False, load_config=True
353 ):
354 """
355 API to create object of class FRRConfig and also create frr_json.conf
356 file. It will create interface and common configurations and save it to
357 frr_json.conf and load to router
358 Parameters
359 ----------
360 * `tgen`: tgen object
361 * `data`: Configuration data saved in a list.
362 * `router` : router id to be configured.
363 * `config_type` : Syntactic information while writing configuration. Should
364 be one of the value as mentioned in the config_map below.
365 * `build` : Only for initial setup phase this is set as True
366 Returns
367 -------
368 True or False
369 """
370 return create_common_configurations(
371 tgen, {router: data}, config_type, build, load_config
372 )
373
374
375 def kill_router_daemons(tgen, router, daemons, save_config=True):
376 """
377 Router's current config would be saved to /etc/frr/ for each daemon
378 and daemon would be killed forcefully using SIGKILL.
379 * `tgen` : topogen object
380 * `router`: Device under test
381 * `daemons`: list of daemons to be killed
382 """
383
384 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
385
386 try:
387 router_list = tgen.routers()
388
389 if save_config:
390 # Saving router config to /etc/frr, which will be loaded to router
391 # when it starts
392 router_list[router].vtysh_cmd("write memory")
393
394 # Kill Daemons
395 result = router_list[router].killDaemons(daemons)
396 if len(result) > 0:
397 assert "Errors found post shutdown - details follow:" == 0, result
398 return result
399
400 except Exception as e:
401 errormsg = traceback.format_exc()
402 logger.error(errormsg)
403 return errormsg
404
405
406 def start_router_daemons(tgen, router, daemons):
407 """
408 Daemons defined by user would be started
409 * `tgen` : topogen object
410 * `router`: Device under test
411 * `daemons`: list of daemons to be killed
412 """
413
414 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
415
416 try:
417 router_list = tgen.routers()
418
419 # Start daemons
420 res = router_list[router].startDaemons(daemons)
421
422 except Exception as e:
423 errormsg = traceback.format_exc()
424 logger.error(errormsg)
425 res = errormsg
426
427 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
428 return res
429
430
431 def check_router_status(tgen):
432 """
433 Check if all daemons are running for all routers in topology
434 * `tgen` : topogen object
435 """
436
437 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
438
439 try:
440 router_list = tgen.routers()
441 for router, rnode in router_list.items():
442
443 result = rnode.check_router_running()
444 if result != "":
445 daemons = []
446 if "bgpd" in result:
447 daemons.append("bgpd")
448 if "zebra" in result:
449 daemons.append("zebra")
450 if "pimd" in result:
451 daemons.append("pimd")
452 if "pim6d" in result:
453 daemons.append("pim6d")
454 if "ospfd" in result:
455 daemons.append("ospfd")
456 if "ospf6d" in result:
457 daemons.append("ospf6d")
458 rnode.startDaemons(daemons)
459
460 except Exception as e:
461 errormsg = traceback.format_exc()
462 logger.error(errormsg)
463 return errormsg
464
465 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
466 return True
467
468
469 def save_initial_config_on_routers(tgen):
470 """Save current configuration on routers to FRRCFG_BKUP_FILE.
471
472 FRRCFG_BKUP_FILE is the file that will be restored when `reset_config_on_routers()`
473 is called.
474
475 Parameters
476 ----------
477 * `tgen` : Topogen object
478 """
479 router_list = tgen.routers()
480 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
481
482 # Get all running configs in parallel
483 procs = {}
484 for rname in router_list:
485 logger.info("Fetching running config for router %s", rname)
486 procs[rname] = router_list[rname].popen(
487 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
488 stdin=None,
489 stdout=open(target_cfg_fmt.format(rname), "w"),
490 stderr=subprocess.PIPE,
491 )
492 for rname, p in procs.items():
493 _, error = p.communicate()
494 if p.returncode:
495 logger.error(
496 "Get running config for %s failed %d: %s", rname, p.returncode, error
497 )
498 raise InvalidCLIError(
499 "vtysh show running error on {}: {}".format(rname, error)
500 )
501
502
503 def reset_config_on_routers(tgen, routerName=None):
504 """
505 Resets configuration on routers to the snapshot created using input JSON
506 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
507
508 Parameters
509 ----------
510 * `tgen` : Topogen object
511 * `routerName` : router config is to be reset
512 """
513
514 logger.debug("Entering API: reset_config_on_routers")
515
516 tgen.cfg_gen += 1
517 gen = tgen.cfg_gen
518
519 # Trim the router list if needed
520 router_list = tgen.routers()
521 if routerName:
522 if routerName not in router_list:
523 logger.warning(
524 "Exiting API: reset_config_on_routers: no router %s",
525 routerName,
526 exc_info=True,
527 )
528 return True
529 router_list = {routerName: router_list[routerName]}
530
531 delta_fmt = tgen.logdir + "/{}/delta-{}.conf"
532 # FRRCFG_BKUP_FILE
533 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
534 run_cfg_fmt = tgen.logdir + "/{}/frr-{}.sav"
535
536 #
537 # Get all running configs in parallel
538 #
539 procs = {}
540 for rname in router_list:
541 logger.info("Fetching running config for router %s", rname)
542 procs[rname] = router_list[rname].popen(
543 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
544 stdin=None,
545 stdout=open(run_cfg_fmt.format(rname, gen), "w"),
546 stderr=subprocess.PIPE,
547 )
548 for rname, p in procs.items():
549 _, error = p.communicate()
550 if p.returncode:
551 logger.error(
552 "Get running config for %s failed %d: %s", rname, p.returncode, error
553 )
554 raise InvalidCLIError(
555 "vtysh show running error on {}: {}".format(rname, error)
556 )
557
558 #
559 # Get all delta's in parallel
560 #
561 procs = {}
562 for rname in router_list:
563 logger.info(
564 "Generating delta for router %s to new configuration (gen %d)", rname, gen
565 )
566 procs[rname] = tgen.net.popen(
567 [
568 "/usr/lib/frr/frr-reload.py",
569 "--test-reset",
570 "--input",
571 run_cfg_fmt.format(rname, gen),
572 "--test",
573 target_cfg_fmt.format(rname),
574 ],
575 stdin=None,
576 stdout=open(delta_fmt.format(rname, gen), "w"),
577 stderr=subprocess.PIPE,
578 )
579 for rname, p in procs.items():
580 _, error = p.communicate()
581 if p.returncode:
582 logger.error(
583 "Delta file creation for %s failed %d: %s", rname, p.returncode, error
584 )
585 raise InvalidCLIError("frr-reload error for {}: {}".format(rname, error))
586
587 #
588 # Apply all the deltas in parallel
589 #
590 procs = {}
591 for rname in router_list:
592 logger.info("Applying delta config on router %s", rname)
593
594 procs[rname] = router_list[rname].popen(
595 ["/usr/bin/env", "vtysh", "-f", delta_fmt.format(rname, gen)],
596 stdin=None,
597 stdout=subprocess.PIPE,
598 stderr=subprocess.STDOUT,
599 )
600 for rname, p in procs.items():
601 output, _ = p.communicate()
602 vtysh_command = "vtysh -f {}".format(delta_fmt.format(rname, gen))
603 if not p.returncode:
604 router_list[rname].logger.info(
605 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
606 vtysh_command, output
607 )
608 )
609 else:
610 router_list[rname].logger.warning(
611 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
612 vtysh_command, output
613 )
614 )
615 logger.error(
616 "Delta file apply for %s failed %d: %s", rname, p.returncode, output
617 )
618
619 # We really need to enable this failure; however, currently frr-reload.py
620 # producing invalid "no" commands as it just preprends "no", but some of the
621 # command forms lack matching values (e.g., final values). Until frr-reload
622 # is fixed to handle this (or all the CLI no forms are adjusted) we can't
623 # fail tests.
624 # raise InvalidCLIError("frr-reload error for {}: {}".format(rname, output))
625
626 #
627 # Optionally log all new running config if "show_router_config" is defined in
628 # "pytest.ini"
629 #
630 if show_router_config:
631 procs = {}
632 for rname in router_list:
633 logger.info("Fetching running config for router %s", rname)
634 procs[rname] = router_list[rname].popen(
635 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
636 stdin=None,
637 stdout=subprocess.PIPE,
638 stderr=subprocess.STDOUT,
639 )
640 for rname, p in procs.items():
641 output, _ = p.communicate()
642 if p.returncode:
643 logger.warning(
644 "Get running config for %s failed %d: %s",
645 rname,
646 p.returncode,
647 output,
648 )
649 else:
650 logger.info(
651 "Configuration on router %s after reset:\n%s", rname, output
652 )
653
654 logger.debug("Exiting API: reset_config_on_routers")
655 return True
656
657
658 def prep_load_config_to_routers(tgen, *config_name_list):
659 """Create common config for `load_config_to_routers`.
660
661 The common config file is constructed from the list of sub-config files passed as
662 position arguments to this function. Each entry in `config_name_list` is looked for
663 under the router sub-directory in the test directory and those files are
664 concatenated together to create the common config. e.g.,
665
666 # Routers are "r1" and "r2", test file is `example/test_example_foo.py`
667 prepare_load_config_to_routers(tgen, "bgpd.conf", "ospfd.conf")
668
669 When the above call is made the files in
670
671 example/r1/bgpd.conf
672 example/r1/ospfd.conf
673
674 Are concat'd together into a single config file that will be loaded on r1, and
675
676 example/r2/bgpd.conf
677 example/r2/ospfd.conf
678
679 Are concat'd together into a single config file that will be loaded on r2 when
680 the call to `load_config_to_routers` is made.
681 """
682
683 routers = tgen.routers()
684 for rname, router in routers.items():
685 destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_FILE)
686 wmode = "w"
687 for cfbase in config_name_list:
688 script_dir = os.environ["PYTEST_TOPOTEST_SCRIPTDIR"]
689 confname = os.path.join(script_dir, "{}/{}".format(rname, cfbase))
690 with open(confname, "r") as cf:
691 with open(destname, wmode) as df:
692 df.write(cf.read())
693 wmode = "a"
694
695
696 def load_config_to_routers(tgen, routers, save_bkup=False):
697 """
698 Loads configuration on routers from the file FRRCFG_FILE.
699
700 Parameters
701 ----------
702 * `tgen` : Topogen object
703 * `routers` : routers for which configuration is to be loaded
704 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
705 Returns
706 -------
707 True or False
708 """
709
710 logger.debug("Entering API: load_config_to_routers")
711
712 tgen.cfg_gen += 1
713 gen = tgen.cfg_gen
714
715 base_router_list = tgen.routers()
716 router_list = {}
717 for router in routers:
718 if router not in base_router_list:
719 continue
720 router_list[router] = base_router_list[router]
721
722 frr_cfg_file_fmt = tgen.logdir + "/{}/" + FRRCFG_FILE
723 frr_cfg_save_file_fmt = tgen.logdir + "/{}/{}-" + FRRCFG_FILE
724 frr_cfg_bkup_fmt = tgen.logdir + "/{}/" + FRRCFG_BKUP_FILE
725
726 procs = {}
727 for rname in router_list:
728 router = router_list[rname]
729 try:
730 frr_cfg_file = frr_cfg_file_fmt.format(rname)
731 frr_cfg_save_file = frr_cfg_save_file_fmt.format(rname, gen)
732 frr_cfg_bkup = frr_cfg_bkup_fmt.format(rname)
733 with open(frr_cfg_file, "r+") as cfg:
734 data = cfg.read()
735 logger.info(
736 "Applying following configuration on router %s (gen: %d):\n%s",
737 rname,
738 gen,
739 data,
740 )
741 # Always save a copy of what we just did
742 with open(frr_cfg_save_file, "w") as bkup:
743 bkup.write(data)
744 if save_bkup:
745 with open(frr_cfg_bkup, "w") as bkup:
746 bkup.write(data)
747 procs[rname] = router_list[rname].popen(
748 ["/usr/bin/env", "vtysh", "-f", frr_cfg_file],
749 stdin=None,
750 stdout=subprocess.PIPE,
751 stderr=subprocess.STDOUT,
752 )
753 except IOError as err:
754 logger.error(
755 "Unable to open config File. error(%s): %s", err.errno, err.strerror
756 )
757 return False
758 except Exception as error:
759 logger.error("Unable to apply config on %s: %s", rname, str(error))
760 return False
761
762 errors = []
763 for rname, p in procs.items():
764 output, _ = p.communicate()
765 frr_cfg_file = frr_cfg_file_fmt.format(rname)
766 vtysh_command = "vtysh -f " + frr_cfg_file
767 if not p.returncode:
768 router_list[rname].logger.info(
769 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
770 vtysh_command, output
771 )
772 )
773 else:
774 router_list[rname].logger.error(
775 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
776 vtysh_command, output
777 )
778 )
779 logger.error(
780 "Config apply for %s failed %d: %s", rname, p.returncode, output
781 )
782 # We can't thorw an exception here as we won't clear the config file.
783 errors.append(
784 InvalidCLIError(
785 "load_config_to_routers error for {}: {}".format(rname, output)
786 )
787 )
788
789 # Empty the config file or we append to it next time through.
790 with open(frr_cfg_file, "r+") as cfg:
791 cfg.truncate(0)
792
793 # Router current configuration to log file or console if
794 # "show_router_config" is defined in "pytest.ini"
795 if show_router_config:
796 procs = {}
797 for rname in router_list:
798 procs[rname] = router_list[rname].popen(
799 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
800 stdin=None,
801 stdout=subprocess.PIPE,
802 stderr=subprocess.STDOUT,
803 )
804 for rname, p in procs.items():
805 output, _ = p.communicate()
806 if p.returncode:
807 logger.warning(
808 "Get running config for %s failed %d: %s",
809 rname,
810 p.returncode,
811 output,
812 )
813 else:
814 logger.info("New configuration for router %s:\n%s", rname, output)
815
816 logger.debug("Exiting API: load_config_to_routers")
817 return not errors
818
819
820 def load_config_to_router(tgen, routerName, save_bkup=False):
821 """
822 Loads configuration on router from the file FRRCFG_FILE.
823
824 Parameters
825 ----------
826 * `tgen` : Topogen object
827 * `routerName` : router for which configuration to be loaded
828 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
829 """
830 return load_config_to_routers(tgen, [routerName], save_bkup)
831
832
833 def reset_with_new_configs(tgen, *cflist):
834 """Reset the router to initial config, then load new configs.
835
836 Resets routers to the initial config state (see `save_initial_config_on_routers()
837 and `reset_config_on_routers()` `), then concat list of router sub-configs together
838 and load onto the routers (see `prep_load_config_to_routers()` and
839 `load_config_to_routers()`)
840 """
841 routers = tgen.routers()
842
843 reset_config_on_routers(tgen)
844 prep_load_config_to_routers(tgen, *cflist)
845 load_config_to_routers(tgen, tgen.routers(), save_bkup=False)
846
847
848 def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
849 """
850 API to get the link local ipv6 address of a particular interface using
851 FRR command 'show interface'
852
853 * `tgen`: tgen object
854 * `router` : router for which highest interface should be
855 calculated
856 * `intf` : interface for which link-local address needs to be taken
857 * `vrf` : VRF name
858
859 Usage
860 -----
861 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
862
863 Returns
864 -------
865 1) array of interface names to link local ips.
866 """
867
868 router_list = tgen.routers()
869 for rname, rnode in router_list.items():
870 if rname != router:
871 continue
872
873 linklocal = []
874
875 if vrf:
876 cmd = "show interface vrf {}".format(vrf)
877 else:
878 cmd = "show interface"
879
880 linklocal = []
881 if vrf:
882 cmd = "show interface vrf {}".format(vrf)
883 else:
884 cmd = "show interface"
885 for chk_ll in range(0, 60):
886 sleep(1 / 4)
887 ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
888 # Fix newlines (make them all the same)
889 ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines()
890
891 interface = None
892 ll_per_if_count = 0
893 for line in ifaces:
894 # Interface name
895 m = re_search("Interface ([a-zA-Z0-9-]+) is", line)
896 if m:
897 interface = m.group(1).split(" ")[0]
898 ll_per_if_count = 0
899
900 # Interface ip
901 m1 = re_search("inet6 (fe80[:a-fA-F0-9]+/[0-9]+)", line)
902 if m1:
903 local = m1.group(1)
904 ll_per_if_count += 1
905 if ll_per_if_count > 1:
906 linklocal += [["%s-%s" % (interface, ll_per_if_count), local]]
907 else:
908 linklocal += [[interface, local]]
909
910 try:
911 if linklocal:
912 if intf:
913 return [
914 _linklocal[1]
915 for _linklocal in linklocal
916 if _linklocal[0] == intf
917 ][0].split("/")[0]
918 return linklocal
919 except IndexError:
920 continue
921
922 errormsg = "Link local ip missing on router {}".format(router)
923 return errormsg
924
925
926 def generate_support_bundle():
927 """
928 API to generate support bundle on any verification ste failure.
929 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
930 which basically runs defined CLIs and dumps the data to specified location
931 """
932
933 tgen = get_topogen()
934 router_list = tgen.routers()
935 test_name = os.environ.get("PYTEST_CURRENT_TEST").split(":")[-1].split(" ")[0]
936
937 bundle_procs = {}
938 for rname, rnode in router_list.items():
939 logger.info("Spawn collection of support bundle for %s", rname)
940 dst_bundle = "{}/{}/support_bundles/{}".format(tgen.logdir, rname, test_name)
941 rnode.run("mkdir -p " + dst_bundle)
942
943 gen_sup_cmd = [
944 "/usr/lib/frr/generate_support_bundle.py",
945 "--log-dir=" + dst_bundle,
946 ]
947 bundle_procs[rname] = tgen.net[rname].popen(gen_sup_cmd, stdin=None)
948
949 for rname, rnode in router_list.items():
950 logger.info("Waiting on support bundle for %s", rname)
951 output, error = bundle_procs[rname].communicate()
952 if output:
953 logger.info(
954 "Output from collecting support bundle for %s:\n%s", rname, output
955 )
956 if error:
957 logger.warning(
958 "Error from collecting support bundle for %s:\n%s", rname, error
959 )
960
961 return True
962
963
964 def start_topology(tgen, daemon=None):
965 """
966 Starting topology, create tmp files which are loaded to routers
967 to start daemons and then start routers
968 * `tgen` : topogen object
969 """
970
971 # Starting topology
972 tgen.start_topology()
973
974 # Starting daemons
975
976 router_list = tgen.routers()
977 routers_sorted = sorted(
978 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
979 )
980
981 linux_ver = ""
982 router_list = tgen.routers()
983 for rname in routers_sorted:
984 router = router_list[rname]
985
986 # It will help in debugging the failures, will give more details on which
987 # specific kernel version tests are failing
988 if linux_ver == "":
989 linux_ver = router.run("uname -a")
990 logger.info("Logging platform related details: \n %s \n", linux_ver)
991
992 try:
993 os.chdir(tgen.logdir)
994
995 # # Creating router named dir and empty zebra.conf bgpd.conf files
996 # # inside the current directory
997 # if os.path.isdir("{}".format(rname)):
998 # os.system("rm -rf {}".format(rname))
999 # os.mkdir("{}".format(rname))
1000 # os.system("chmod -R go+rw {}".format(rname))
1001 # os.chdir("{}/{}".format(tgen.logdir, rname))
1002 # os.system("touch zebra.conf bgpd.conf")
1003 # else:
1004 # os.mkdir("{}".format(rname))
1005 # os.system("chmod -R go+rw {}".format(rname))
1006 # os.chdir("{}/{}".format(tgen.logdir, rname))
1007 # os.system("touch zebra.conf bgpd.conf")
1008
1009 except IOError as err:
1010 logger.error("I/O error({0}): {1}".format(err.errno, err.strerror))
1011
1012 # Loading empty zebra.conf file to router, to start the zebra daemon
1013 router.load_config(
1014 TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(tgen.logdir, rname)
1015 )
1016
1017 # Loading empty bgpd.conf file to router, to start the bgp daemon
1018 router.load_config(
1019 TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(tgen.logdir, rname)
1020 )
1021
1022 if daemon and "ospfd" in daemon:
1023 # Loading empty ospf.conf file to router, to start the bgp daemon
1024 router.load_config(
1025 TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(tgen.logdir, rname)
1026 )
1027
1028 if daemon and "ospf6d" in daemon:
1029 # Loading empty ospf.conf file to router, to start the bgp daemon
1030 router.load_config(
1031 TopoRouter.RD_OSPF6, "{}/{}/ospf6d.conf".format(tgen.logdir, rname)
1032 )
1033
1034 if daemon and "pimd" in daemon:
1035 # Loading empty pimd.conf file to router, to start the pim deamon
1036 router.load_config(
1037 TopoRouter.RD_PIM, "{}/{}/pimd.conf".format(tgen.logdir, rname)
1038 )
1039
1040 if daemon and "pim6d" in daemon:
1041 # Loading empty pimd.conf file to router, to start the pim6d deamon
1042 router.load_config(
1043 TopoRouter.RD_PIM6, "{}/{}/pim6d.conf".format(tgen.logdir, rname)
1044 )
1045
1046 # Starting routers
1047 logger.info("Starting all routers once topology is created")
1048 tgen.start_router()
1049
1050
1051 def stop_router(tgen, router):
1052 """
1053 Router"s current config would be saved to /tmp/topotest/<suite>/<router> for each daemon
1054 and router and its daemons would be stopped.
1055
1056 * `tgen` : topogen object
1057 * `router`: Device under test
1058 """
1059
1060 router_list = tgen.routers()
1061
1062 # Saving router config to /etc/frr, which will be loaded to router
1063 # when it starts
1064 router_list[router].vtysh_cmd("write memory")
1065
1066 # Stop router
1067 router_list[router].stop()
1068
1069
1070 def start_router(tgen, router):
1071 """
1072 Router will be started and config would be loaded from /tmp/topotest/<suite>/<router> for each
1073 daemon
1074
1075 * `tgen` : topogen object
1076 * `router`: Device under test
1077 """
1078
1079 logger.debug("Entering lib API: start_router")
1080
1081 try:
1082 router_list = tgen.routers()
1083
1084 # Router and its daemons would be started and config would
1085 # be loaded to router for each daemon from /etc/frr
1086 router_list[router].start()
1087
1088 # Waiting for router to come up
1089 sleep(5)
1090
1091 except Exception as e:
1092 errormsg = traceback.format_exc()
1093 logger.error(errormsg)
1094 return errormsg
1095
1096 logger.debug("Exiting lib API: start_router()")
1097 return True
1098
1099
1100 def number_to_row(routerName):
1101 """
1102 Returns the number for the router.
1103 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
1104 etc
1105 """
1106 return int(routerName[1:])
1107
1108
1109 def number_to_column(routerName):
1110 """
1111 Returns the number for the router.
1112 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
1113 z23 = column 26 etc
1114 """
1115 return ord(routerName[0]) - 97
1116
1117
1118 def topo_daemons(tgen, topo=None):
1119 """
1120 Returns daemon list required for the suite based on topojson.
1121 """
1122 daemon_list = []
1123
1124 if topo is None:
1125 topo = tgen.json_topo
1126
1127 router_list = tgen.routers()
1128 routers_sorted = sorted(
1129 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
1130 )
1131
1132 for rtr in routers_sorted:
1133 if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list:
1134 daemon_list.append("ospfd")
1135
1136 if "ospf6" in topo["routers"][rtr] and "ospf6d" not in daemon_list:
1137 daemon_list.append("ospf6d")
1138
1139 for val in topo["routers"][rtr]["links"].values():
1140 if "pim" in val and "pimd" not in daemon_list:
1141 daemon_list.append("pimd")
1142 if "pim6" in val and "pim6d" not in daemon_list:
1143 daemon_list.append("pim6d")
1144 if "ospf" in val and "ospfd" not in daemon_list:
1145 daemon_list.append("ospfd")
1146 if "ospf6" in val and "ospf6d" not in daemon_list:
1147 daemon_list.append("ospf6d")
1148 break
1149
1150 return daemon_list
1151
1152
1153 def add_interfaces_to_vlan(tgen, input_dict):
1154 """
1155 Add interfaces to VLAN, we need vlan pakcage to be installed on machine
1156
1157 * `tgen`: tgen onject
1158 * `input_dict` : interfaces to be added to vlans
1159
1160 input_dict= {
1161 "r1":{
1162 "vlan":{
1163 VLAN_1: [{
1164 intf_r1_s1: {
1165 "ip": "10.1.1.1",
1166 "subnet": "255.255.255.0
1167 }
1168 }]
1169 }
1170 }
1171 }
1172
1173 add_interfaces_to_vlan(tgen, input_dict)
1174
1175 """
1176
1177 router_list = tgen.routers()
1178 for dut in input_dict.keys():
1179 rnode = router_list[dut]
1180
1181 if "vlan" in input_dict[dut]:
1182 for vlan, interfaces in input_dict[dut]["vlan"].items():
1183 for intf_dict in interfaces:
1184 for interface, data in intf_dict.items():
1185 # Adding interface to VLAN
1186 vlan_intf = "{}.{}".format(interface, vlan)
1187 cmd = "ip link add link {} name {} type vlan id {}".format(
1188 interface, vlan_intf, vlan
1189 )
1190 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1191 result = rnode.run(cmd)
1192 logger.info("result %s", result)
1193
1194 # Bringing interface up
1195 cmd = "ip link set {} up".format(vlan_intf)
1196 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1197 result = rnode.run(cmd)
1198 logger.info("result %s", result)
1199
1200 # Assigning IP address
1201 ifaddr = ipaddress.ip_interface(
1202 "{}/{}".format(
1203 frr_unicode(data["ip"]), frr_unicode(data["subnet"])
1204 )
1205 )
1206
1207 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1208 ifaddr.version, vlan_intf, ifaddr
1209 )
1210 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1211 result = rnode.run(cmd)
1212 logger.info("result %s", result)
1213
1214
1215 def tcpdump_capture_start(
1216 tgen,
1217 router,
1218 intf,
1219 protocol=None,
1220 grepstr=None,
1221 timeout=0,
1222 options=None,
1223 cap_file=None,
1224 background=True,
1225 ):
1226 """
1227 API to capture network packets using tcp dump.
1228
1229 Packages used :
1230
1231 Parameters
1232 ----------
1233 * `tgen`: topogen object.
1234 * `router`: router on which ping has to be performed.
1235 * `intf` : interface for capture.
1236 * `protocol` : protocol for which packet needs to be captured.
1237 * `grepstr` : string to filter out tcp dump output.
1238 * `timeout` : Time for which packet needs to be captured.
1239 * `options` : options for TCP dump, all tcpdump options can be used.
1240 * `cap_file` : filename to store capture dump.
1241 * `background` : Make tcp dump run in back ground.
1242
1243 Usage
1244 -----
1245 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1246 options='-A -vv -x > r2bgp.txt ')
1247 Returns
1248 -------
1249 1) True for successful capture
1250 2) errormsg - when tcp dump fails
1251 """
1252
1253 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1254
1255 rnode = tgen.gears[router]
1256
1257 if timeout > 0:
1258 cmd = "timeout {}".format(timeout)
1259 else:
1260 cmd = ""
1261
1262 cmdargs = "{} tcpdump".format(cmd)
1263
1264 if intf:
1265 cmdargs += " -i {}".format(str(intf))
1266 if protocol:
1267 cmdargs += " {}".format(str(protocol))
1268 if options:
1269 cmdargs += " -s 0 {}".format(str(options))
1270
1271 if cap_file:
1272 file_name = os.path.join(tgen.logdir, router, cap_file)
1273 cmdargs += " -w {}".format(str(file_name))
1274 # Remove existing capture file
1275 rnode.run("rm -rf {}".format(file_name))
1276
1277 if grepstr:
1278 cmdargs += ' | grep "{}"'.format(str(grepstr))
1279
1280 logger.info("Running tcpdump command: [%s]", cmdargs)
1281 if not background:
1282 rnode.run(cmdargs)
1283 else:
1284 # XXX this & is bogus doesn't work
1285 # rnode.run("nohup {} & /dev/null 2>&1".format(cmdargs))
1286 rnode.run("nohup {} > /dev/null 2>&1".format(cmdargs))
1287
1288 # Check if tcpdump process is running
1289 if background:
1290 result = rnode.run("pgrep tcpdump")
1291 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1292
1293 if not result:
1294 errormsg = "tcpdump is not running {}".format("tcpdump")
1295 return errormsg
1296 else:
1297 logger.info("Packet capture started on %s: interface %s", router, intf)
1298
1299 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1300 return True
1301
1302
1303 def tcpdump_capture_stop(tgen, router):
1304 """
1305 API to capture network packets using tcp dump.
1306
1307 Packages used :
1308
1309 Parameters
1310 ----------
1311 * `tgen`: topogen object.
1312 * `router`: router on which ping has to be performed.
1313 * `intf` : interface for capture.
1314 * `protocol` : protocol for which packet needs to be captured.
1315 * `grepstr` : string to filter out tcp dump output.
1316 * `timeout` : Time for which packet needs to be captured.
1317 * `options` : options for TCP dump, all tcpdump options can be used.
1318 * `cap2file` : filename to store capture dump.
1319 * `bakgrnd` : Make tcp dump run in back ground.
1320
1321 Usage
1322 -----
1323 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1324 options='-A -vv -x > r2bgp.txt ')
1325 Returns
1326 -------
1327 1) True for successful capture
1328 2) errormsg - when tcp dump fails
1329 """
1330
1331 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1332
1333 rnode = tgen.gears[router]
1334
1335 # Check if tcpdump process is running
1336 result = rnode.run("ps -ef | grep tcpdump")
1337 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1338
1339 if not re_search(r"{}".format("tcpdump"), result):
1340 errormsg = "tcpdump is not running {}".format("tcpdump")
1341 return errormsg
1342 else:
1343 # XXX this doesn't work with micronet
1344 ppid = tgen.net.nameToNode[rnode.name].pid
1345 rnode.run("set +m; pkill -P %s tcpdump &> /dev/null" % ppid)
1346 logger.info("Stopped tcpdump capture")
1347
1348 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1349 return True
1350
1351
1352 def create_debug_log_config(tgen, input_dict, build=False):
1353 """
1354 Enable/disable debug logs for any protocol with defined debug
1355 options and logs would be saved to created log file
1356
1357 Parameters
1358 ----------
1359 * `tgen` : Topogen object
1360 * `input_dict` : details to enable debug logs for protocols
1361 * `build` : Only for initial setup phase this is set as True.
1362
1363
1364 Usage:
1365 ------
1366 input_dict = {
1367 "r2": {
1368 "debug":{
1369 "log_file" : "debug.log",
1370 "enable": ["pimd", "zebra"],
1371 "disable": {
1372 "bgpd":[
1373 'debug bgp neighbor-events',
1374 'debug bgp updates',
1375 'debug bgp zebra',
1376 ]
1377 }
1378 }
1379 }
1380 }
1381
1382 result = create_debug_log_config(tgen, input_dict)
1383
1384 Returns
1385 -------
1386 True or False
1387 """
1388
1389 result = False
1390 try:
1391 debug_config_dict = {}
1392
1393 for router in input_dict.keys():
1394 debug_config = []
1395 if "debug" in input_dict[router]:
1396 debug_dict = input_dict[router]["debug"]
1397
1398 disable_logs = debug_dict.setdefault("disable", None)
1399 enable_logs = debug_dict.setdefault("enable", None)
1400 log_file = debug_dict.setdefault("log_file", None)
1401
1402 if log_file:
1403 _log_file = os.path.join(tgen.logdir, log_file)
1404 debug_config.append("log file {} \n".format(_log_file))
1405
1406 if type(enable_logs) is list:
1407 for daemon in enable_logs:
1408 for debug_log in DEBUG_LOGS[daemon]:
1409 debug_config.append("{}".format(debug_log))
1410 elif type(enable_logs) is dict:
1411 for daemon, debug_logs in enable_logs.items():
1412 for debug_log in debug_logs:
1413 debug_config.append("{}".format(debug_log))
1414
1415 if type(disable_logs) is list:
1416 for daemon in disable_logs:
1417 for debug_log in DEBUG_LOGS[daemon]:
1418 debug_config.append("no {}".format(debug_log))
1419 elif type(disable_logs) is dict:
1420 for daemon, debug_logs in disable_logs.items():
1421 for debug_log in debug_logs:
1422 debug_config.append("no {}".format(debug_log))
1423 if debug_config:
1424 debug_config_dict[router] = debug_config
1425
1426 result = create_common_configurations(
1427 tgen, debug_config_dict, "debug_log_config", build=build
1428 )
1429 except InvalidCLIError:
1430 # Traceback
1431 errormsg = traceback.format_exc()
1432 logger.error(errormsg)
1433 return errormsg
1434
1435 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1436 return result
1437
1438
1439 #############################################
1440 # Common APIs, will be used by all protocols
1441 #############################################
1442
1443
1444 def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
1445 """
1446 Create vrf configuration for created topology. VRF
1447 configuration is provided in input json file.
1448
1449 VRF config is done in Linux Kernel:
1450 * Create VRF
1451 * Attach interface to VRF
1452 * Bring up VRF
1453
1454 Parameters
1455 ----------
1456 * `tgen` : Topogen object
1457 * `topo` : json file data
1458 * `input_dict` : Input dict data, required when configuring
1459 from testcase
1460 * `build` : Only for initial setup phase this is set as True.
1461
1462 Usage
1463 -----
1464 input_dict={
1465 "r3": {
1466 "links": {
1467 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
1468 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
1469 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
1470 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
1471 },
1472 "vrfs":[
1473 {
1474 "name": "RED_A",
1475 "id": "1"
1476 },
1477 {
1478 "name": "RED_B",
1479 "id": "2"
1480 },
1481 {
1482 "name": "BLUE_A",
1483 "id": "3",
1484 "delete": True
1485 },
1486 {
1487 "name": "BLUE_B",
1488 "id": "4"
1489 }
1490 ]
1491 }
1492 }
1493 result = create_vrf_cfg(tgen, topo, input_dict)
1494
1495 Returns
1496 -------
1497 True or False
1498 """
1499 result = True
1500 if not input_dict:
1501 input_dict = deepcopy(topo)
1502 else:
1503 input_dict = deepcopy(input_dict)
1504
1505 try:
1506 config_data_dict = {}
1507
1508 for c_router, c_data in input_dict.items():
1509 rnode = tgen.gears[c_router]
1510 config_data = []
1511 if "vrfs" in c_data:
1512 for vrf in c_data["vrfs"]:
1513 name = vrf.setdefault("name", None)
1514 table_id = vrf.setdefault("id", None)
1515 del_action = vrf.setdefault("delete", False)
1516
1517 if del_action:
1518 # Kernel cmd- Add VRF and table
1519 cmd = "ip link del {} type vrf table {}".format(
1520 vrf["name"], vrf["id"]
1521 )
1522
1523 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
1524 rnode.run(cmd)
1525
1526 # Kernel cmd - Bring down VRF
1527 cmd = "ip link set dev {} down".format(name)
1528 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
1529 rnode.run(cmd)
1530
1531 else:
1532 if name and table_id:
1533 # Kernel cmd- Add VRF and table
1534 cmd = "ip link add {} type vrf table {}".format(
1535 name, table_id
1536 )
1537 logger.info(
1538 "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
1539 )
1540 rnode.run(cmd)
1541
1542 # Kernel cmd - Bring up VRF
1543 cmd = "ip link set dev {} up".format(name)
1544 logger.info(
1545 "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
1546 )
1547 rnode.run(cmd)
1548
1549 for vrf in c_data["vrfs"]:
1550 vni = vrf.setdefault("vni", None)
1551 del_vni = vrf.setdefault("no_vni", None)
1552
1553 if "links" in c_data:
1554 for destRouterLink, data in sorted(c_data["links"].items()):
1555 # Loopback interfaces
1556 if "type" in data and data["type"] == "loopback":
1557 interface_name = destRouterLink
1558 else:
1559 interface_name = data["interface"]
1560
1561 if "vrf" in data:
1562 vrf_list = data["vrf"]
1563
1564 if type(vrf_list) is not list:
1565 vrf_list = [vrf_list]
1566
1567 for _vrf in vrf_list:
1568 cmd = "ip link set {} master {}".format(
1569 interface_name, _vrf
1570 )
1571
1572 logger.info(
1573 "[DUT: %s]: Running" " kernel cmd [%s]",
1574 c_router,
1575 cmd,
1576 )
1577 rnode.run(cmd)
1578
1579 if vni:
1580 config_data.append("vrf {}".format(vrf["name"]))
1581 cmd = "vni {}".format(vni)
1582 config_data.append(cmd)
1583
1584 if del_vni:
1585 config_data.append("vrf {}".format(vrf["name"]))
1586 cmd = "no vni {}".format(del_vni)
1587 config_data.append(cmd)
1588
1589 if config_data:
1590 config_data_dict[c_router] = config_data
1591
1592 result = create_common_configurations(
1593 tgen, config_data_dict, "vrf", build=build
1594 )
1595
1596 except InvalidCLIError:
1597 # Traceback
1598 errormsg = traceback.format_exc()
1599 logger.error(errormsg)
1600 return errormsg
1601
1602 return result
1603
1604
1605 def create_interface_in_kernel(
1606 tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
1607 ):
1608 """
1609 Cretae interfaces in kernel for ipv4/ipv6
1610 Config is done in Linux Kernel:
1611
1612 Parameters
1613 ----------
1614 * `tgen` : Topogen object
1615 * `dut` : Device for which interfaces to be added
1616 * `name` : interface name
1617 * `ip_addr` : ip address for interface
1618 * `vrf` : VRF name, to which interface will be associated
1619 * `netmask` : netmask value, default is None
1620 * `create`: Create interface in kernel, if created then no need
1621 to create
1622 """
1623
1624 rnode = tgen.gears[dut]
1625
1626 if create:
1627 cmd = "ip link show {0} >/dev/null || ip link add {0} type dummy".format(name)
1628 rnode.run(cmd)
1629
1630 if not netmask:
1631 ifaddr = ipaddress.ip_interface(frr_unicode(ip_addr))
1632 else:
1633 ifaddr = ipaddress.ip_interface(
1634 "{}/{}".format(frr_unicode(ip_addr), frr_unicode(netmask))
1635 )
1636 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1637 ifaddr.version, name, ifaddr
1638 )
1639 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1640 rnode.run(cmd)
1641
1642 if vrf:
1643 cmd = "ip link set {} master {}".format(name, vrf)
1644 rnode.run(cmd)
1645
1646
1647 def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False):
1648 """
1649 Cretae interfaces in kernel for ipv4/ipv6
1650 Config is done in Linux Kernel:
1651
1652 Parameters
1653 ----------
1654 * `tgen` : Topogen object
1655 * `dut` : Device for which interfaces to be added
1656 * `intf_name` : interface name
1657 * `ifaceaction` : False to shutdown and True to bringup the
1658 ineterface
1659 """
1660
1661 rnode = tgen.gears[dut]
1662
1663 cmd = "ip link set dev"
1664 if ifaceaction:
1665 action = "up"
1666 cmd = "{} {} {}".format(cmd, intf_name, action)
1667 else:
1668 action = "down"
1669 cmd = "{} {} {}".format(cmd, intf_name, action)
1670
1671 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1672 rnode.run(cmd)
1673
1674
1675 def validate_ip_address(ip_address):
1676 """
1677 Validates the type of ip address
1678 Parameters
1679 ----------
1680 * `ip_address`: IPv4/IPv6 address
1681 Returns
1682 -------
1683 Type of address as string
1684 """
1685
1686 if "/" in ip_address:
1687 ip_address = ip_address.split("/")[0]
1688
1689 v4 = True
1690 v6 = True
1691 try:
1692 socket.inet_aton(ip_address)
1693 except socket.error as error:
1694 logger.debug("Not a valid IPv4 address")
1695 v4 = False
1696 else:
1697 return "ipv4"
1698
1699 try:
1700 socket.inet_pton(socket.AF_INET6, ip_address)
1701 except socket.error as error:
1702 logger.debug("Not a valid IPv6 address")
1703 v6 = False
1704 else:
1705 return "ipv6"
1706
1707 if not v4 and not v6:
1708 raise Exception(
1709 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1710 )
1711
1712
1713 def check_address_types(addr_type=None):
1714 """
1715 Checks environment variable set and compares with the current address type
1716 """
1717
1718 addr_types_env = os.environ.get("ADDRESS_TYPES")
1719 if not addr_types_env:
1720 addr_types_env = "dual"
1721
1722 if addr_types_env == "dual":
1723 addr_types = ["ipv4", "ipv6"]
1724 elif addr_types_env == "ipv4":
1725 addr_types = ["ipv4"]
1726 elif addr_types_env == "ipv6":
1727 addr_types = ["ipv6"]
1728
1729 if addr_type is None:
1730 return addr_types
1731
1732 if addr_type not in addr_types:
1733 logger.debug(
1734 "{} not in supported/configured address types {}".format(
1735 addr_type, addr_types
1736 )
1737 )
1738 return False
1739
1740 return True
1741
1742
1743 def generate_ips(network, no_of_ips):
1744 """
1745 Returns list of IPs.
1746 based on start_ip and no_of_ips
1747
1748 * `network` : from here the ip will start generating,
1749 start_ip will be
1750 * `no_of_ips` : these many IPs will be generated
1751 """
1752 ipaddress_list = []
1753 if type(network) is not list:
1754 network = [network]
1755
1756 for start_ipaddr in network:
1757 if "/" in start_ipaddr:
1758 start_ip = start_ipaddr.split("/")[0]
1759 mask = int(start_ipaddr.split("/")[1])
1760 else:
1761 logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr))
1762 assert 0
1763
1764 addr_type = validate_ip_address(start_ip)
1765 if addr_type == "ipv4":
1766 if start_ip == "0.0.0.0" and mask == 0 and no_of_ips == 1:
1767 ipaddress_list.append("{}/{}".format(start_ip, mask))
1768 return ipaddress_list
1769 start_ip = ipaddress.IPv4Address(frr_unicode(start_ip))
1770 step = 2 ** (32 - mask)
1771 elif addr_type == "ipv6":
1772 if start_ip == "0::0" and mask == 0 and no_of_ips == 1:
1773 ipaddress_list.append("{}/{}".format(start_ip, mask))
1774 return ipaddress_list
1775 start_ip = ipaddress.IPv6Address(frr_unicode(start_ip))
1776 step = 2 ** (128 - mask)
1777 else:
1778 return []
1779
1780 next_ip = start_ip
1781 count = 0
1782 while count < no_of_ips:
1783 ipaddress_list.append("{}/{}".format(next_ip, mask))
1784 if addr_type == "ipv6":
1785 next_ip = ipaddress.IPv6Address(int(next_ip) + step)
1786 else:
1787 next_ip += step
1788 count += 1
1789
1790 return ipaddress_list
1791
1792
1793 def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
1794 """
1795 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1796 it will return highest IP from loopback IPs otherwise from physical
1797 interface IPs.
1798 * `topo` : json file data
1799 * `router` : router for which highest interface should be calculated
1800 """
1801
1802 link_data = topo["routers"][router]["links"]
1803 lo_list = []
1804 interfaces_list = []
1805 lo_exists = False
1806 for destRouterLink, data in sorted(link_data.items()):
1807 if loopback:
1808 if "type" in data and data["type"] == "loopback":
1809 lo_exists = True
1810 ip_address = topo["routers"][router]["links"][destRouterLink][
1811 "ipv4"
1812 ].split("/")[0]
1813 lo_list.append(ip_address)
1814 if interface:
1815 ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split(
1816 "/"
1817 )[0]
1818 interfaces_list.append(ip_address)
1819
1820 if lo_exists:
1821 return sorted(lo_list)[-1]
1822
1823 return sorted(interfaces_list)[-1]
1824
1825
1826 def write_test_header(tc_name):
1827 """Display message at beginning of test case"""
1828 count = 20
1829 logger.info("*" * (len(tc_name) + count))
1830 step("START -> Testcase : %s" % tc_name, reset=True)
1831 logger.info("*" * (len(tc_name) + count))
1832
1833
1834 def write_test_footer(tc_name):
1835 """Display message at end of test case"""
1836 count = 21
1837 logger.info("=" * (len(tc_name) + count))
1838 logger.info("Testcase : %s -> PASSED", tc_name)
1839 logger.info("=" * (len(tc_name) + count))
1840
1841
1842 def interface_status(tgen, topo, input_dict):
1843 """
1844 Delete ip route maps from device
1845 * `tgen` : Topogen object
1846 * `topo` : json file data
1847 * `input_dict` : for which router, route map has to be deleted
1848 Usage
1849 -----
1850 input_dict = {
1851 "r3": {
1852 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1853 "status": "down"
1854 }
1855 }
1856 Returns
1857 -------
1858 errormsg(str) or True
1859 """
1860 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1861
1862 try:
1863 rlist = []
1864
1865 for router in input_dict.keys():
1866
1867 interface_list = input_dict[router]["interface_list"]
1868 status = input_dict[router].setdefault("status", "up")
1869 for intf in interface_list:
1870 rnode = tgen.gears[router]
1871 interface_set_status(rnode, intf, status)
1872
1873 rlist.append(router)
1874
1875 # Load config to routers
1876 load_config_to_routers(tgen, rlist)
1877
1878 except Exception as e:
1879 errormsg = traceback.format_exc()
1880 logger.error(errormsg)
1881 return errormsg
1882
1883 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1884 return True
1885
1886
1887 def retry(retry_timeout, initial_wait=0, expected=True, diag_pct=0.75):
1888 """
1889 Fixture: Retries function while it's return value is an errormsg (str), False, or it raises an exception.
1890
1891 * `retry_timeout`: Retry for at least this many seconds; after waiting initial_wait seconds
1892 * `initial_wait`: Sleeps for this many seconds before first executing function
1893 * `expected`: if False then the return logic is inverted, except for exceptions,
1894 (i.e., a False or errmsg (str) function return ends the retry loop,
1895 and returns that False or str value)
1896 * `diag_pct`: Percentage of `retry_timeout` to keep testing after negative result would have
1897 been returned in order to see if a positive result comes after. This is an
1898 important diagnostic tool, and normally should not be disabled. Calls to wrapped
1899 functions though, can override the `diag_pct` value to make it larger in case more
1900 diagnostic retrying is appropriate.
1901 """
1902
1903 def _retry(func):
1904 @wraps(func)
1905 def func_retry(*args, **kwargs):
1906 # We will continue to retry diag_pct of the timeout value to see if test would have passed with a
1907 # longer retry timeout value.
1908 saved_failure = None
1909
1910 retry_sleep = 2
1911
1912 # Allow the wrapped function's args to override the fixtures
1913 _retry_timeout = kwargs.pop("retry_timeout", retry_timeout)
1914 _expected = kwargs.pop("expected", expected)
1915 _initial_wait = kwargs.pop("initial_wait", initial_wait)
1916 _diag_pct = kwargs.pop("diag_pct", diag_pct)
1917
1918 start_time = datetime.now()
1919 retry_until = datetime.now() + timedelta(
1920 seconds=_retry_timeout + _initial_wait
1921 )
1922
1923 if initial_wait > 0:
1924 logger.info("Waiting for [%s]s as initial delay", initial_wait)
1925 sleep(initial_wait)
1926
1927 invert_logic = not _expected
1928 while True:
1929 seconds_left = (retry_until - datetime.now()).total_seconds()
1930 try:
1931 ret = func(*args, **kwargs)
1932 logger.debug("Function returned %s", ret)
1933
1934 negative_result = ret is False or is_string(ret)
1935 if negative_result == invert_logic:
1936 # Simple case, successful result in time
1937 if not saved_failure:
1938 return ret
1939
1940 # Positive result, but happened after timeout failure, very important to
1941 # note for fixing tests.
1942 logger.warning(
1943 "RETRY DIAGNOSTIC: SUCCEED after FAILED with requested timeout of %.1fs; however, succeeded in %.1fs, investigate timeout timing",
1944 _retry_timeout,
1945 (datetime.now() - start_time).total_seconds(),
1946 )
1947 if isinstance(saved_failure, Exception):
1948 raise saved_failure # pylint: disable=E0702
1949 return saved_failure
1950
1951 except Exception as error:
1952 logger.info("Function raised exception: %s", str(error))
1953 ret = error
1954
1955 if seconds_left < 0 and saved_failure:
1956 logger.info(
1957 "RETRY DIAGNOSTIC: Retry timeout reached, still failing"
1958 )
1959 if isinstance(saved_failure, Exception):
1960 raise saved_failure # pylint: disable=E0702
1961 return saved_failure
1962
1963 if seconds_left < 0:
1964 logger.info("Retry timeout of %ds reached", _retry_timeout)
1965
1966 saved_failure = ret
1967 retry_extra_delta = timedelta(
1968 seconds=seconds_left + _retry_timeout * _diag_pct
1969 )
1970 retry_until = datetime.now() + retry_extra_delta
1971 seconds_left = retry_extra_delta.total_seconds()
1972
1973 # Generate bundle after setting remaining diagnostic retry time
1974 generate_support_bundle()
1975
1976 # If user has disabled diagnostic retries return now
1977 if not _diag_pct:
1978 if isinstance(saved_failure, Exception):
1979 raise saved_failure
1980 return saved_failure
1981
1982 if saved_failure:
1983 logger.info(
1984 "RETRY DIAG: [failure] Sleeping %ds until next retry with %.1f retry time left - too see if timeout was too short",
1985 retry_sleep,
1986 seconds_left,
1987 )
1988 else:
1989 logger.info(
1990 "Sleeping %ds until next retry with %.1f retry time left",
1991 retry_sleep,
1992 seconds_left,
1993 )
1994 sleep(retry_sleep)
1995
1996 func_retry._original = func
1997 return func_retry
1998
1999 return _retry
2000
2001
2002 class Stepper:
2003 """
2004 Prints step number for the test case step being executed
2005 """
2006
2007 count = 1
2008
2009 def __call__(self, msg, reset):
2010 if reset:
2011 Stepper.count = 1
2012 logger.info(msg)
2013 else:
2014 logger.info("STEP %s: '%s'", Stepper.count, msg)
2015 Stepper.count += 1
2016
2017
2018 def step(msg, reset=False):
2019 """
2020 Call Stepper to print test steps. Need to reset at the beginning of test.
2021 * ` msg` : Step message body.
2022 * `reset` : Reset step count to 1 when set to True.
2023 """
2024 _step = Stepper()
2025 _step(msg, reset)
2026
2027
2028 def do_countdown(secs):
2029 """
2030 Countdown timer display
2031 """
2032 for i in range(secs, 0, -1):
2033 sys.stdout.write("{} ".format(str(i)))
2034 sys.stdout.flush()
2035 sleep(1)
2036 return
2037
2038
2039 #############################################
2040 # These APIs, will used by testcase
2041 #############################################
2042 def create_interfaces_cfg(tgen, topo, build=False):
2043 """
2044 Create interface configuration for created topology. Basic Interface
2045 configuration is provided in input json file.
2046
2047 Parameters
2048 ----------
2049 * `tgen` : Topogen object
2050 * `topo` : json file data
2051 * `build` : Only for initial setup phase this is set as True.
2052
2053 Returns
2054 -------
2055 True or False
2056 """
2057
2058 def _create_interfaces_ospf_cfg(ospf, c_data, data, ospf_keywords):
2059 interface_data = []
2060 ip_ospf = "ipv6 ospf6" if ospf == "ospf6" else "ip ospf"
2061 for keyword in ospf_keywords:
2062 if keyword in data[ospf]:
2063 intf_ospf_value = c_data["links"][destRouterLink][ospf][keyword]
2064 if "delete" in data and data["delete"]:
2065 interface_data.append(
2066 "no {} {}".format(ip_ospf, keyword.replace("_", "-"))
2067 )
2068 else:
2069 interface_data.append(
2070 "{} {} {}".format(
2071 ip_ospf, keyword.replace("_", "-"), intf_ospf_value
2072 )
2073 )
2074 return interface_data
2075
2076 result = False
2077 topo = deepcopy(topo)
2078
2079 try:
2080 interface_data_dict = {}
2081
2082 for c_router, c_data in topo.items():
2083 interface_data = []
2084 for destRouterLink, data in sorted(c_data["links"].items()):
2085 # Loopback interfaces
2086 if "type" in data and data["type"] == "loopback":
2087 interface_name = destRouterLink
2088 else:
2089 interface_name = data["interface"]
2090
2091 interface_data.append("interface {}".format(str(interface_name)))
2092
2093 if "ipv4" in data:
2094 intf_addr = c_data["links"][destRouterLink]["ipv4"]
2095
2096 if "delete" in data and data["delete"]:
2097 interface_data.append("no ip address {}".format(intf_addr))
2098 else:
2099 interface_data.append("ip address {}".format(intf_addr))
2100 if "ipv6" in data:
2101 intf_addr = c_data["links"][destRouterLink]["ipv6"]
2102
2103 if "delete" in data and data["delete"]:
2104 interface_data.append("no ipv6 address {}".format(intf_addr))
2105 else:
2106 interface_data.append("ipv6 address {}".format(intf_addr))
2107
2108 # Wait for vrf interfaces to get link local address once they are up
2109 if (
2110 not destRouterLink == "lo"
2111 and "vrf" in topo[c_router]["links"][destRouterLink]
2112 ):
2113 vrf = topo[c_router]["links"][destRouterLink]["vrf"]
2114 intf = topo[c_router]["links"][destRouterLink]["interface"]
2115 ll = get_frr_ipv6_linklocal(tgen, c_router, intf=intf, vrf=vrf)
2116
2117 if "ipv6-link-local" in data:
2118 intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
2119
2120 if "delete" in data and data["delete"]:
2121 interface_data.append("no ipv6 address {}".format(intf_addr))
2122 else:
2123 interface_data.append("ipv6 address {}\n".format(intf_addr))
2124
2125 ospf_keywords = [
2126 "hello_interval",
2127 "dead_interval",
2128 "network",
2129 "priority",
2130 "cost",
2131 "mtu_ignore",
2132 ]
2133 if "ospf" in data:
2134 interface_data += _create_interfaces_ospf_cfg(
2135 "ospf", c_data, data, ospf_keywords + ["area"]
2136 )
2137 if "ospf6" in data:
2138 interface_data += _create_interfaces_ospf_cfg(
2139 "ospf6", c_data, data, ospf_keywords + ["area"]
2140 )
2141 if interface_data:
2142 interface_data_dict[c_router] = interface_data
2143
2144 result = create_common_configurations(
2145 tgen, interface_data_dict, "interface_config", build=build
2146 )
2147
2148 except InvalidCLIError:
2149 # Traceback
2150 errormsg = traceback.format_exc()
2151 logger.error(errormsg)
2152 return errormsg
2153
2154 return result
2155
2156
2157 def create_static_routes(tgen, input_dict, build=False):
2158 """
2159 Create static routes for given router as defined in input_dict
2160
2161 Parameters
2162 ----------
2163 * `tgen` : Topogen object
2164 * `input_dict` : Input dict data, required when configuring from testcase
2165 * `build` : Only for initial setup phase this is set as True.
2166
2167 Usage
2168 -----
2169 input_dict should be in the format below:
2170 # static_routes: list of all routes
2171 # network: network address
2172 # no_of_ip: number of next-hop address that will be configured
2173 # admin_distance: admin distance for route/routes.
2174 # next_hop: starting next-hop address
2175 # tag: tag id for static routes
2176 # vrf: VRF name in which static routes needs to be created
2177 # delete: True if config to be removed. Default False.
2178
2179 Example:
2180 "routers": {
2181 "r1": {
2182 "static_routes": [
2183 {
2184 "network": "100.0.20.1/32",
2185 "no_of_ip": 9,
2186 "admin_distance": 100,
2187 "next_hop": "10.0.0.1",
2188 "tag": 4001,
2189 "vrf": "RED_A"
2190 "delete": true
2191 }
2192 ]
2193 }
2194 }
2195
2196 Returns
2197 -------
2198 errormsg(str) or True
2199 """
2200 result = False
2201 logger.debug("Entering lib API: create_static_routes()")
2202 input_dict = deepcopy(input_dict)
2203
2204 try:
2205 static_routes_list_dict = {}
2206
2207 for router in input_dict.keys():
2208 if "static_routes" not in input_dict[router]:
2209 errormsg = "static_routes not present in input_dict"
2210 logger.info(errormsg)
2211 continue
2212
2213 static_routes_list = []
2214
2215 static_routes = input_dict[router]["static_routes"]
2216 for static_route in static_routes:
2217 del_action = static_route.setdefault("delete", False)
2218 no_of_ip = static_route.setdefault("no_of_ip", 1)
2219 network = static_route.setdefault("network", [])
2220 if type(network) is not list:
2221 network = [network]
2222
2223 admin_distance = static_route.setdefault("admin_distance", None)
2224 tag = static_route.setdefault("tag", None)
2225 vrf = static_route.setdefault("vrf", None)
2226 interface = static_route.setdefault("interface", None)
2227 next_hop = static_route.setdefault("next_hop", None)
2228 nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
2229
2230 ip_list = generate_ips(network, no_of_ip)
2231 for ip in ip_list:
2232 addr_type = validate_ip_address(ip)
2233
2234 if addr_type == "ipv4":
2235 cmd = "ip route {}".format(ip)
2236 else:
2237 cmd = "ipv6 route {}".format(ip)
2238
2239 if interface:
2240 cmd = "{} {}".format(cmd, interface)
2241
2242 if next_hop:
2243 cmd = "{} {}".format(cmd, next_hop)
2244
2245 if nexthop_vrf:
2246 cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
2247
2248 if vrf:
2249 cmd = "{} vrf {}".format(cmd, vrf)
2250
2251 if tag:
2252 cmd = "{} tag {}".format(cmd, str(tag))
2253
2254 if admin_distance:
2255 cmd = "{} {}".format(cmd, admin_distance)
2256
2257 if del_action:
2258 cmd = "no {}".format(cmd)
2259
2260 static_routes_list.append(cmd)
2261
2262 if static_routes_list:
2263 static_routes_list_dict[router] = static_routes_list
2264
2265 result = create_common_configurations(
2266 tgen, static_routes_list_dict, "static_route", build=build
2267 )
2268
2269 except InvalidCLIError:
2270 # Traceback
2271 errormsg = traceback.format_exc()
2272 logger.error(errormsg)
2273 return errormsg
2274
2275 logger.debug("Exiting lib API: create_static_routes()")
2276 return result
2277
2278
2279 def create_prefix_lists(tgen, input_dict, build=False):
2280 """
2281 Create ip prefix lists as per the config provided in input
2282 JSON or input_dict
2283 Parameters
2284 ----------
2285 * `tgen` : Topogen object
2286 * `input_dict` : Input dict data, required when configuring from testcase
2287 * `build` : Only for initial setup phase this is set as True.
2288 Usage
2289 -----
2290 # pf_lists_1: name of prefix-list, user defined
2291 # seqid: prefix-list seqid, auto-generated if not given by user
2292 # network: criteria for applying prefix-list
2293 # action: permit/deny
2294 # le: less than or equal number of bits
2295 # ge: greater than or equal number of bits
2296 Example
2297 -------
2298 input_dict = {
2299 "r1": {
2300 "prefix_lists":{
2301 "ipv4": {
2302 "pf_list_1": [
2303 {
2304 "seqid": 10,
2305 "network": "any",
2306 "action": "permit",
2307 "le": "32",
2308 "ge": "30",
2309 "delete": True
2310 }
2311 ]
2312 }
2313 }
2314 }
2315 }
2316 Returns
2317 -------
2318 errormsg or True
2319 """
2320
2321 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2322 result = False
2323 try:
2324 config_data_dict = {}
2325
2326 for router in input_dict.keys():
2327 if "prefix_lists" not in input_dict[router]:
2328 errormsg = "prefix_lists not present in input_dict"
2329 logger.debug(errormsg)
2330 continue
2331
2332 config_data = []
2333 prefix_lists = input_dict[router]["prefix_lists"]
2334 for addr_type, prefix_data in prefix_lists.items():
2335 if not check_address_types(addr_type):
2336 continue
2337
2338 for prefix_name, prefix_list in prefix_data.items():
2339 for prefix_dict in prefix_list:
2340 if "action" not in prefix_dict or "network" not in prefix_dict:
2341 errormsg = "'action' or network' missing in" " input_dict"
2342 return errormsg
2343
2344 network_addr = prefix_dict["network"]
2345 action = prefix_dict["action"]
2346 le = prefix_dict.setdefault("le", None)
2347 ge = prefix_dict.setdefault("ge", None)
2348 seqid = prefix_dict.setdefault("seqid", None)
2349 del_action = prefix_dict.setdefault("delete", False)
2350 if seqid is None:
2351 seqid = get_seq_id("prefix_lists", router, prefix_name)
2352 else:
2353 set_seq_id("prefix_lists", router, seqid, prefix_name)
2354
2355 if addr_type == "ipv4":
2356 protocol = "ip"
2357 else:
2358 protocol = "ipv6"
2359
2360 cmd = "{} prefix-list {} seq {} {} {}".format(
2361 protocol, prefix_name, seqid, action, network_addr
2362 )
2363 if le:
2364 cmd = "{} le {}".format(cmd, le)
2365 if ge:
2366 cmd = "{} ge {}".format(cmd, ge)
2367
2368 if del_action:
2369 cmd = "no {}".format(cmd)
2370
2371 config_data.append(cmd)
2372 if config_data:
2373 config_data_dict[router] = config_data
2374
2375 result = create_common_configurations(
2376 tgen, config_data_dict, "prefix_list", build=build
2377 )
2378
2379 except InvalidCLIError:
2380 # Traceback
2381 errormsg = traceback.format_exc()
2382 logger.error(errormsg)
2383 return errormsg
2384
2385 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2386 return result
2387
2388
2389 def create_route_maps(tgen, input_dict, build=False):
2390 """
2391 Create route-map on the devices as per the arguments passed
2392 Parameters
2393 ----------
2394 * `tgen` : Topogen object
2395 * `input_dict` : Input dict data, required when configuring from testcase
2396 * `build` : Only for initial setup phase this is set as True.
2397 Usage
2398 -----
2399 # route_maps: key, value pair for route-map name and its attribute
2400 # rmap_match_prefix_list_1: user given name for route-map
2401 # action: PERMIT/DENY
2402 # match: key,value pair for match criteria. prefix_list, community-list,
2403 large-community-list or tag. Only one option at a time.
2404 # prefix_list: name of prefix list
2405 # large-community-list: name of large community list
2406 # community-ist: name of community list
2407 # tag: tag id for static routes
2408 # set: key, value pair for modifying route attributes
2409 # localpref: preference value for the network
2410 # med: metric value advertised for AS
2411 # aspath: set AS path value
2412 # weight: weight for the route
2413 # community: standard community value to be attached
2414 # large_community: large community value to be attached
2415 # community_additive: if set to "additive", adds community/large-community
2416 value to the existing values of the network prefix
2417 Example:
2418 --------
2419 input_dict = {
2420 "r1": {
2421 "route_maps": {
2422 "rmap_match_prefix_list_1": [
2423 {
2424 "action": "PERMIT",
2425 "match": {
2426 "ipv4": {
2427 "prefix_list": "pf_list_1"
2428 }
2429 "ipv6": {
2430 "prefix_list": "pf_list_1"
2431 }
2432 "large-community-list": {
2433 "id": "community_1",
2434 "exact_match": True
2435 }
2436 "community_list": {
2437 "id": "community_2",
2438 "exact_match": True
2439 }
2440 "tag": "tag_id"
2441 },
2442 "set": {
2443 "locPrf": 150,
2444 "metric": 30,
2445 "path": {
2446 "num": 20000,
2447 "action": "prepend",
2448 },
2449 "weight": 500,
2450 "community": {
2451 "num": "1:2 2:3",
2452 "action": additive
2453 }
2454 "large_community": {
2455 "num": "1:2:3 4:5;6",
2456 "action": additive
2457 },
2458 }
2459 }
2460 ]
2461 }
2462 }
2463 }
2464 Returns
2465 -------
2466 errormsg(str) or True
2467 """
2468
2469 result = False
2470 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2471 input_dict = deepcopy(input_dict)
2472 try:
2473 rmap_data_dict = {}
2474
2475 for router in input_dict.keys():
2476 if "route_maps" not in input_dict[router]:
2477 logger.debug("route_maps not present in input_dict")
2478 continue
2479 rmap_data = []
2480 for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
2481
2482 for rmap_dict in rmap_value:
2483 del_action = rmap_dict.setdefault("delete", False)
2484
2485 if del_action:
2486 rmap_data.append("no route-map {}".format(rmap_name))
2487 continue
2488
2489 if "action" not in rmap_dict:
2490 errormsg = "action not present in input_dict"
2491 logger.error(errormsg)
2492 return False
2493
2494 rmap_action = rmap_dict.setdefault("action", "deny")
2495
2496 seq_id = rmap_dict.setdefault("seq_id", None)
2497 if seq_id is None:
2498 seq_id = get_seq_id("route_maps", router, rmap_name)
2499 else:
2500 set_seq_id("route_maps", router, seq_id, rmap_name)
2501
2502 rmap_data.append(
2503 "route-map {} {} {}".format(rmap_name, rmap_action, seq_id)
2504 )
2505
2506 if "continue" in rmap_dict:
2507 continue_to = rmap_dict["continue"]
2508 if continue_to:
2509 rmap_data.append("on-match goto {}".format(continue_to))
2510 else:
2511 logger.error(
2512 "In continue, 'route-map entry "
2513 "sequence number' is not provided"
2514 )
2515 return False
2516
2517 if "goto" in rmap_dict:
2518 go_to = rmap_dict["goto"]
2519 if go_to:
2520 rmap_data.append("on-match goto {}".format(go_to))
2521 else:
2522 logger.error(
2523 "In goto, 'Goto Clause number' is not" " provided"
2524 )
2525 return False
2526
2527 if "call" in rmap_dict:
2528 call_rmap = rmap_dict["call"]
2529 if call_rmap:
2530 rmap_data.append("call {}".format(call_rmap))
2531 else:
2532 logger.error(
2533 "In call, 'destination Route-Map' is" " not provided"
2534 )
2535 return False
2536
2537 # Verifying if SET criteria is defined
2538 if "set" in rmap_dict:
2539 set_data = rmap_dict["set"]
2540 ipv4_data = set_data.setdefault("ipv4", {})
2541 ipv6_data = set_data.setdefault("ipv6", {})
2542 local_preference = set_data.setdefault("locPrf", None)
2543 metric = set_data.setdefault("metric", None)
2544 metric_type = set_data.setdefault("metric-type", None)
2545 as_path = set_data.setdefault("path", {})
2546 weight = set_data.setdefault("weight", None)
2547 community = set_data.setdefault("community", {})
2548 large_community = set_data.setdefault("large_community", {})
2549 large_comm_list = set_data.setdefault("large_comm_list", {})
2550 set_action = set_data.setdefault("set_action", None)
2551 nexthop = set_data.setdefault("nexthop", None)
2552 origin = set_data.setdefault("origin", None)
2553 ext_comm_list = set_data.setdefault("extcommunity", {})
2554 metrictype = set_data.setdefault("metric-type", {})
2555
2556 # Local Preference
2557 if local_preference:
2558 rmap_data.append(
2559 "set local-preference {}".format(local_preference)
2560 )
2561
2562 # Metric-Type
2563 if metrictype:
2564 rmap_data.append("set metric-type {}\n".format(metrictype))
2565
2566 # Metric
2567 if metric:
2568 del_comm = set_data.setdefault("delete", None)
2569 if del_comm:
2570 rmap_data.append("no set metric {}".format(metric))
2571 else:
2572 rmap_data.append("set metric {}".format(metric))
2573
2574 # Origin
2575 if origin:
2576 rmap_data.append("set origin {} \n".format(origin))
2577
2578 # AS Path Prepend
2579 if as_path:
2580 as_num = as_path.setdefault("as_num", None)
2581 as_action = as_path.setdefault("as_action", None)
2582 if as_action and as_num:
2583 rmap_data.append(
2584 "set as-path {} {}".format(as_action, as_num)
2585 )
2586
2587 # Community
2588 if community:
2589 num = community.setdefault("num", None)
2590 comm_action = community.setdefault("action", None)
2591 if num:
2592 cmd = "set community {}".format(num)
2593 if comm_action:
2594 cmd = "{} {}".format(cmd, comm_action)
2595 rmap_data.append(cmd)
2596 else:
2597 logger.error("In community, AS Num not" " provided")
2598 return False
2599
2600 if large_community:
2601 num = large_community.setdefault("num", None)
2602 comm_action = large_community.setdefault("action", None)
2603 if num:
2604 cmd = "set large-community {}".format(num)
2605 if comm_action:
2606 cmd = "{} {}".format(cmd, comm_action)
2607
2608 rmap_data.append(cmd)
2609 else:
2610 logger.error(
2611 "In large_community, AS Num not" " provided"
2612 )
2613 return False
2614 if large_comm_list:
2615 id = large_comm_list.setdefault("id", None)
2616 del_comm = large_comm_list.setdefault("delete", None)
2617 if id:
2618 cmd = "set large-comm-list {}".format(id)
2619 if del_comm:
2620 cmd = "{} delete".format(cmd)
2621
2622 rmap_data.append(cmd)
2623 else:
2624 logger.error("In large_comm_list 'id' not" " provided")
2625 return False
2626
2627 if ext_comm_list:
2628 rt = ext_comm_list.setdefault("rt", None)
2629 del_comm = ext_comm_list.setdefault("delete", None)
2630 if rt:
2631 cmd = "set extcommunity rt {}".format(rt)
2632 if del_comm:
2633 cmd = "{} delete".format(cmd)
2634
2635 rmap_data.append(cmd)
2636 else:
2637 logger.debug("In ext_comm_list 'rt' not" " provided")
2638 return False
2639
2640 # Weight
2641 if weight:
2642 rmap_data.append("set weight {}".format(weight))
2643 if ipv6_data:
2644 nexthop = ipv6_data.setdefault("nexthop", None)
2645 if nexthop:
2646 rmap_data.append("set ipv6 next-hop {}".format(nexthop))
2647
2648 # Adding MATCH and SET sequence to RMAP if defined
2649 if "match" in rmap_dict:
2650 match_data = rmap_dict["match"]
2651 ipv4_data = match_data.setdefault("ipv4", {})
2652 ipv6_data = match_data.setdefault("ipv6", {})
2653 community = match_data.setdefault("community_list", {})
2654 large_community = match_data.setdefault("large_community", {})
2655 large_community_list = match_data.setdefault(
2656 "large_community_list", {}
2657 )
2658
2659 metric = match_data.setdefault("metric", None)
2660 source_vrf = match_data.setdefault("source-vrf", None)
2661
2662 if ipv4_data:
2663 # fetch prefix list data from rmap
2664 prefix_name = ipv4_data.setdefault("prefix_lists", None)
2665 if prefix_name:
2666 rmap_data.append(
2667 "match ip address"
2668 " prefix-list {}".format(prefix_name)
2669 )
2670
2671 # fetch tag data from rmap
2672 tag = ipv4_data.setdefault("tag", None)
2673 if tag:
2674 rmap_data.append("match tag {}".format(tag))
2675
2676 # fetch large community data from rmap
2677 large_community_list = ipv4_data.setdefault(
2678 "large_community_list", {}
2679 )
2680 large_community = match_data.setdefault(
2681 "large_community", {}
2682 )
2683
2684 if ipv6_data:
2685 prefix_name = ipv6_data.setdefault("prefix_lists", None)
2686 if prefix_name:
2687 rmap_data.append(
2688 "match ipv6 address"
2689 " prefix-list {}".format(prefix_name)
2690 )
2691
2692 # fetch tag data from rmap
2693 tag = ipv6_data.setdefault("tag", None)
2694 if tag:
2695 rmap_data.append("match tag {}".format(tag))
2696
2697 # fetch large community data from rmap
2698 large_community_list = ipv6_data.setdefault(
2699 "large_community_list", {}
2700 )
2701 large_community = match_data.setdefault(
2702 "large_community", {}
2703 )
2704
2705 if community:
2706 if "id" not in community:
2707 logger.error(
2708 "'id' is mandatory for "
2709 "community-list in match"
2710 " criteria"
2711 )
2712 return False
2713 cmd = "match community {}".format(community["id"])
2714 exact_match = community.setdefault("exact_match", False)
2715 if exact_match:
2716 cmd = "{} exact-match".format(cmd)
2717
2718 rmap_data.append(cmd)
2719 if large_community:
2720 if "id" not in large_community:
2721 logger.error(
2722 "'id' is mandatory for "
2723 "large-community-list in match "
2724 "criteria"
2725 )
2726 return False
2727 cmd = "match large-community {}".format(
2728 large_community["id"]
2729 )
2730 exact_match = large_community.setdefault(
2731 "exact_match", False
2732 )
2733 if exact_match:
2734 cmd = "{} exact-match".format(cmd)
2735 rmap_data.append(cmd)
2736 if large_community_list:
2737 if "id" not in large_community_list:
2738 logger.error(
2739 "'id' is mandatory for "
2740 "large-community-list in match "
2741 "criteria"
2742 )
2743 return False
2744 cmd = "match large-community {}".format(
2745 large_community_list["id"]
2746 )
2747 exact_match = large_community_list.setdefault(
2748 "exact_match", False
2749 )
2750 if exact_match:
2751 cmd = "{} exact-match".format(cmd)
2752 rmap_data.append(cmd)
2753
2754 if source_vrf:
2755 cmd = "match source-vrf {}".format(source_vrf)
2756 rmap_data.append(cmd)
2757
2758 if metric:
2759 cmd = "match metric {}".format(metric)
2760 rmap_data.append(cmd)
2761
2762 if rmap_data:
2763 rmap_data_dict[router] = rmap_data
2764
2765 result = create_common_configurations(
2766 tgen, rmap_data_dict, "route_maps", build=build
2767 )
2768
2769 except InvalidCLIError:
2770 # Traceback
2771 errormsg = traceback.format_exc()
2772 logger.error(errormsg)
2773 return errormsg
2774
2775 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2776 return result
2777
2778
2779 def delete_route_maps(tgen, input_dict):
2780 """
2781 Delete ip route maps from device
2782 * `tgen` : Topogen object
2783 * `input_dict` : for which router,
2784 route map has to be deleted
2785 Usage
2786 -----
2787 # Delete route-map rmap_1 and rmap_2 from router r1
2788 input_dict = {
2789 "r1": {
2790 "route_maps": ["rmap_1", "rmap__2"]
2791 }
2792 }
2793 result = delete_route_maps("ipv4", input_dict)
2794 Returns
2795 -------
2796 errormsg(str) or True
2797 """
2798 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2799
2800 for router in input_dict.keys():
2801 route_maps = input_dict[router]["route_maps"][:]
2802 rmap_data = input_dict[router]
2803 rmap_data["route_maps"] = {}
2804 for route_map_name in route_maps:
2805 rmap_data["route_maps"].update({route_map_name: [{"delete": True}]})
2806
2807 return create_route_maps(tgen, input_dict)
2808
2809
2810 def create_bgp_community_lists(tgen, input_dict, build=False):
2811 """
2812 Create bgp community-list or large-community-list on the devices as per
2813 the arguments passed. Takes list of communities in input.
2814 Parameters
2815 ----------
2816 * `tgen` : Topogen object
2817 * `input_dict` : Input dict data, required when configuring from testcase
2818 * `build` : Only for initial setup phase this is set as True.
2819 Usage
2820 -----
2821 input_dict_1 = {
2822 "r3": {
2823 "bgp_community_lists": [
2824 {
2825 "community_type": "standard",
2826 "action": "permit",
2827 "name": "rmap_lcomm_{}".format(addr_type),
2828 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2829 "large": True
2830 }
2831 ]
2832 }
2833 }
2834 }
2835 result = create_bgp_community_lists(tgen, input_dict_1)
2836 """
2837
2838 result = False
2839 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2840 input_dict = deepcopy(input_dict)
2841 try:
2842 config_data_dict = {}
2843
2844 for router in input_dict.keys():
2845 if "bgp_community_lists" not in input_dict[router]:
2846 errormsg = "bgp_community_lists not present in input_dict"
2847 logger.debug(errormsg)
2848 continue
2849
2850 config_data = []
2851
2852 community_list = input_dict[router]["bgp_community_lists"]
2853 for community_dict in community_list:
2854 del_action = community_dict.setdefault("delete", False)
2855 community_type = community_dict.setdefault("community_type", None)
2856 action = community_dict.setdefault("action", None)
2857 value = community_dict.setdefault("value", "")
2858 large = community_dict.setdefault("large", None)
2859 name = community_dict.setdefault("name", None)
2860 if large:
2861 cmd = "bgp large-community-list"
2862 else:
2863 cmd = "bgp community-list"
2864
2865 if not large and not (community_type and action and value):
2866 errormsg = (
2867 "community_type, action and value are "
2868 "required in bgp_community_list"
2869 )
2870 logger.error(errormsg)
2871 return False
2872
2873 cmd = "{} {} {} {} {}".format(cmd, community_type, name, action, value)
2874
2875 if del_action:
2876 cmd = "no {}".format(cmd)
2877
2878 config_data.append(cmd)
2879
2880 if config_data:
2881 config_data_dict[router] = config_data
2882
2883 result = create_common_configurations(
2884 tgen, config_data_dict, "bgp_community_list", build=build
2885 )
2886
2887 except InvalidCLIError:
2888 # Traceback
2889 errormsg = traceback.format_exc()
2890 logger.error(errormsg)
2891 return errormsg
2892
2893 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2894 return result
2895
2896
2897 def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
2898 """
2899 Shutdown or bringup router's interface "
2900 * `tgen` : Topogen object
2901 * `dut` : Device under test
2902 * `intf_name` : Interface name to be shut/no shut
2903 * `ifaceaction` : Action, to shut/no shut interface,
2904 by default is False
2905 Usage
2906 -----
2907 dut = "r3"
2908 intf = "r3-r1-eth0"
2909 # Shut down interface
2910 shutdown_bringup_interface(tgen, dut, intf, False)
2911 # Bring up interface
2912 shutdown_bringup_interface(tgen, dut, intf, True)
2913 Returns
2914 -------
2915 errormsg(str) or True
2916 """
2917
2918 router_list = tgen.routers()
2919 if ifaceaction:
2920 logger.info("Bringing up interface {} : {}".format(dut, intf_name))
2921 else:
2922 logger.info("Shutting down interface {} : {}".format(dut, intf_name))
2923
2924 interface_set_status(router_list[dut], intf_name, ifaceaction)
2925
2926
2927 def addKernelRoute(
2928 tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
2929 ):
2930 """
2931 Add route to kernel
2932
2933 Parameters:
2934 -----------
2935 * `tgen` : Topogen object
2936 * `router`: router for which kernel routes needs to be added
2937 * `intf`: interface name, for which kernel routes needs to be added
2938 * `bindToAddress`: bind to <host>, an interface or multicast
2939 address
2940
2941 returns:
2942 --------
2943 errormsg or True
2944 """
2945
2946 logger.debug("Entering lib API: addKernelRoute()")
2947
2948 rnode = tgen.gears[router]
2949
2950 if type(group_addr_range) is not list:
2951 group_addr_range = [group_addr_range]
2952
2953 for grp_addr in group_addr_range:
2954
2955 addr_type = validate_ip_address(grp_addr)
2956 if addr_type == "ipv4":
2957 if next_hop is not None:
2958 cmd = "ip route add {} via {}".format(grp_addr, next_hop)
2959 else:
2960 cmd = "ip route add {} dev {}".format(grp_addr, intf)
2961 if del_action:
2962 cmd = "ip route del {}".format(grp_addr)
2963 verify_cmd = "ip route"
2964 elif addr_type == "ipv6":
2965 if intf and src:
2966 cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
2967 else:
2968 cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
2969 verify_cmd = "ip -6 route"
2970 if del_action:
2971 cmd = "ip -6 route del {}".format(grp_addr)
2972
2973 logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
2974 output = rnode.run(cmd)
2975
2976 def check_in_kernel(rnode, verify_cmd, grp_addr, router):
2977 # Verifying if ip route added to kernel
2978 errormsg = None
2979 result = rnode.run(verify_cmd)
2980 logger.debug("{}\n{}".format(verify_cmd, result))
2981 if "/" in grp_addr:
2982 ip, mask = grp_addr.split("/")
2983 if mask == "32" or mask == "128":
2984 grp_addr = ip
2985 else:
2986 mask = "32" if addr_type == "ipv4" else "128"
2987
2988 if not re_search(r"{}".format(grp_addr), result) and mask != "0":
2989 errormsg = (
2990 "[DUT: {}]: Kernal route is not added for group"
2991 " address {} Config output: {}".format(
2992 router, grp_addr, output
2993 )
2994 )
2995
2996 return errormsg
2997
2998 test_func = functools.partial(
2999 check_in_kernel, rnode, verify_cmd, grp_addr, router
3000 )
3001 (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1)
3002 assert result, out
3003
3004 logger.debug("Exiting lib API: addKernelRoute()")
3005 return True
3006
3007
3008 def configure_vxlan(tgen, input_dict):
3009 """
3010 Add and configure vxlan
3011
3012 * `tgen`: tgen object
3013 * `input_dict` : data for vxlan config
3014
3015 Usage:
3016 ------
3017 input_dict= {
3018 "dcg2":{
3019 "vxlan":[{
3020 "vxlan_name": "vxlan75100",
3021 "vxlan_id": "75100",
3022 "dstport": 4789,
3023 "local_addr": "120.0.0.1",
3024 "learning": "no",
3025 "delete": True
3026 }]
3027 }
3028 }
3029
3030 configure_vxlan(tgen, input_dict)
3031
3032 Returns:
3033 -------
3034 True or errormsg
3035
3036 """
3037
3038 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3039
3040 router_list = tgen.routers()
3041 for dut in input_dict.keys():
3042 rnode = router_list[dut]
3043
3044 if "vxlan" in input_dict[dut]:
3045 for vxlan_dict in input_dict[dut]["vxlan"]:
3046 cmd = "ip link "
3047
3048 del_vxlan = vxlan_dict.setdefault("delete", None)
3049 vxlan_names = vxlan_dict.setdefault("vxlan_name", [])
3050 vxlan_ids = vxlan_dict.setdefault("vxlan_id", [])
3051 dstport = vxlan_dict.setdefault("dstport", None)
3052 local_addr = vxlan_dict.setdefault("local_addr", None)
3053 learning = vxlan_dict.setdefault("learning", None)
3054
3055 config_data = []
3056 if vxlan_names and vxlan_ids:
3057 for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids):
3058 cmd = "ip link"
3059
3060 if del_vxlan:
3061 cmd = "{} del {} type vxlan id {}".format(
3062 cmd, vxlan_name, vxlan_id
3063 )
3064 else:
3065 cmd = "{} add {} type vxlan id {}".format(
3066 cmd, vxlan_name, vxlan_id
3067 )
3068
3069 if dstport:
3070 cmd = "{} dstport {}".format(cmd, dstport)
3071
3072 if local_addr:
3073 ip_cmd = "ip addr add {} dev {}".format(
3074 local_addr, vxlan_name
3075 )
3076 if del_vxlan:
3077 ip_cmd = "ip addr del {} dev {}".format(
3078 local_addr, vxlan_name
3079 )
3080
3081 config_data.append(ip_cmd)
3082
3083 cmd = "{} local {}".format(cmd, local_addr)
3084
3085 if learning == "no":
3086 cmd = "{} nolearning".format(cmd)
3087
3088 elif learning == "yes":
3089 cmd = "{} learning".format(cmd)
3090
3091 config_data.append(cmd)
3092
3093 try:
3094 for _cmd in config_data:
3095 logger.info("[DUT: %s]: Running command: %s", dut, _cmd)
3096 rnode.run(_cmd)
3097
3098 except InvalidCLIError:
3099 # Traceback
3100 errormsg = traceback.format_exc()
3101 logger.error(errormsg)
3102 return errormsg
3103
3104 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3105
3106 return True
3107
3108
3109 def configure_brctl(tgen, topo, input_dict):
3110 """
3111 Add and configure brctl
3112
3113 * `tgen`: tgen object
3114 * `input_dict` : data for brctl config
3115
3116 Usage:
3117 ------
3118 input_dict= {
3119 dut:{
3120 "brctl": [{
3121 "brctl_name": "br100",
3122 "addvxlan": "vxlan75100",
3123 "vrf": "RED",
3124 "stp": "off"
3125 }]
3126 }
3127 }
3128
3129 configure_brctl(tgen, topo, input_dict)
3130
3131 Returns:
3132 -------
3133 True or errormsg
3134
3135 """
3136
3137 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3138
3139 router_list = tgen.routers()
3140 for dut in input_dict.keys():
3141 rnode = router_list[dut]
3142
3143 if "brctl" in input_dict[dut]:
3144 for brctl_dict in input_dict[dut]["brctl"]:
3145
3146 brctl_names = brctl_dict.setdefault("brctl_name", [])
3147 addvxlans = brctl_dict.setdefault("addvxlan", [])
3148 stp_values = brctl_dict.setdefault("stp", [])
3149 vrfs = brctl_dict.setdefault("vrf", [])
3150
3151 ip_cmd = "ip link set"
3152 for brctl_name, vxlan, vrf, stp in zip(
3153 brctl_names, addvxlans, vrfs, stp_values
3154 ):
3155
3156 ip_cmd_list = []
3157 cmd = "ip link add name {} type bridge stp_state {}".format(
3158 brctl_name, stp
3159 )
3160
3161 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3162 rnode.run(cmd)
3163
3164 ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name))
3165
3166 if vxlan:
3167 cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name)
3168
3169 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3170 rnode.run(cmd)
3171
3172 ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan))
3173
3174 if vrf:
3175 ip_cmd_list.append(
3176 "{} dev {} master {}".format(ip_cmd, brctl_name, vrf)
3177 )
3178
3179 for intf_name, data in topo["routers"][dut]["links"].items():
3180 if "vrf" not in data:
3181 continue
3182
3183 if data["vrf"] == vrf:
3184 ip_cmd_list.append(
3185 "{} up dev {}".format(ip_cmd, data["interface"])
3186 )
3187
3188 try:
3189 for _ip_cmd in ip_cmd_list:
3190 logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd)
3191 rnode.run(_ip_cmd)
3192
3193 except InvalidCLIError:
3194 # Traceback
3195 errormsg = traceback.format_exc()
3196 logger.error(errormsg)
3197 return errormsg
3198
3199 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3200 return True
3201
3202
3203 def configure_interface_mac(tgen, input_dict):
3204 """
3205 Add and configure brctl
3206
3207 * `tgen`: tgen object
3208 * `input_dict` : data for mac config
3209
3210 input_mac= {
3211 "edge1":{
3212 "br75100": "00:80:48:BA:d1:00,
3213 "br75200": "00:80:48:BA:d1:00
3214 }
3215 }
3216
3217 configure_interface_mac(tgen, input_mac)
3218
3219 Returns:
3220 -------
3221 True or errormsg
3222
3223 """
3224
3225 router_list = tgen.routers()
3226 for dut in input_dict.keys():
3227 rnode = router_list[dut]
3228
3229 for intf, mac in input_dict[dut].items():
3230 cmd = "ip link set {} address {}".format(intf, mac)
3231 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3232
3233 try:
3234 result = rnode.run(cmd)
3235 if len(result) != 0:
3236 return result
3237
3238 except InvalidCLIError:
3239 # Traceback
3240 errormsg = traceback.format_exc()
3241 logger.error(errormsg)
3242 return errormsg
3243
3244 return True
3245
3246
3247 def socat_send_mld_join(
3248 tgen,
3249 server,
3250 protocol_option,
3251 mld_groups,
3252 send_from_intf,
3253 send_from_intf_ip=None,
3254 port=12345,
3255 reuseaddr=True,
3256 ):
3257 """
3258 API to send MLD join using SOCAT tool
3259
3260 Parameters:
3261 -----------
3262 * `tgen` : Topogen object
3263 * `server`: iperf server, from where IGMP join would be sent
3264 * `protocol_option`: Protocol options, ex: UDP6-RECV
3265 * `mld_groups`: IGMP group for which join has to be sent
3266 * `send_from_intf`: Interface from which join would be sent
3267 * `send_from_intf_ip`: Interface IP, default is None
3268 * `port`: Port to be used, default is 12345
3269 * `reuseaddr`: True|False, bydefault True
3270
3271 returns:
3272 --------
3273 errormsg or True
3274 """
3275
3276 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3277
3278 rnode = tgen.routers()[server]
3279 socat_args = "socat -u "
3280
3281 # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
3282 if protocol_option:
3283 socat_args += "{}".format(protocol_option)
3284
3285 if port:
3286 socat_args += ":{},".format(port)
3287
3288 if reuseaddr:
3289 socat_args += "{},".format("reuseaddr")
3290
3291 # Group address range to cover
3292 if mld_groups:
3293 if not isinstance(mld_groups, list):
3294 mld_groups = [mld_groups]
3295
3296 for mld_group in mld_groups:
3297 socat_cmd = socat_args
3298 join_option = "ipv6-join-group"
3299
3300 if send_from_intf and not send_from_intf_ip:
3301 socat_cmd += "{}='[{}]:{}'".format(join_option, mld_group, send_from_intf)
3302 else:
3303 socat_cmd += "{}='[{}]:{}:[{}]'".format(
3304 join_option, mld_group, send_from_intf, send_from_intf_ip
3305 )
3306
3307 socat_cmd += " STDOUT"
3308
3309 socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
3310
3311 # Run socat command to send IGMP join
3312 logger.info("[DUT: {}]: Running command: [{}]".format(server, socat_cmd))
3313 output = rnode.run("set +m; {} sleep 0.5".format(socat_cmd))
3314
3315 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3316 return True
3317
3318
3319 def socat_send_pim6_traffic(
3320 tgen,
3321 server,
3322 protocol_option,
3323 mld_groups,
3324 send_from_intf,
3325 port=12345,
3326 multicast_hops=True,
3327 ):
3328 """
3329 API to send pim6 data taffic using SOCAT tool
3330
3331 Parameters:
3332 -----------
3333 * `tgen` : Topogen object
3334 * `server`: iperf server, from where IGMP join would be sent
3335 * `protocol_option`: Protocol options, ex: UDP6-RECV
3336 * `mld_groups`: MLD group for which join has to be sent
3337 * `send_from_intf`: Interface from which join would be sent
3338 * `port`: Port to be used, default is 12345
3339 * `multicast_hops`: multicast-hops count, default is 255
3340
3341 returns:
3342 --------
3343 errormsg or True
3344 """
3345
3346 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3347
3348 rnode = tgen.routers()[server]
3349 socat_args = "socat -u STDIO "
3350
3351 # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
3352 if protocol_option:
3353 socat_args += "'{}".format(protocol_option)
3354
3355 # Group address range to cover
3356 if mld_groups:
3357 if not isinstance(mld_groups, list):
3358 mld_groups = [mld_groups]
3359
3360 for mld_group in mld_groups:
3361 socat_cmd = socat_args
3362 if port:
3363 socat_cmd += ":[{}]:{},".format(mld_group, port)
3364
3365 if send_from_intf:
3366 socat_cmd += "interface={0},so-bindtodevice={0},".format(send_from_intf)
3367
3368 if multicast_hops:
3369 socat_cmd += "multicast-hops=255'"
3370
3371 socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
3372
3373 # Run socat command to send pim6 traffic
3374 logger.info(
3375 "[DUT: {}]: Running command: [set +m; ( while sleep 1; do date; done ) | {}]".format(
3376 server, socat_cmd
3377 )
3378 )
3379
3380 # Open a shell script file and write data to it, which will be
3381 # used to send pim6 traffic continously
3382 traffic_shell_script = "{}/{}/traffic.sh".format(tgen.logdir, server)
3383 with open("{}".format(traffic_shell_script), "w") as taffic_sh:
3384 taffic_sh.write(
3385 "#!/usr/bin/env bash\n( while sleep 1; do date; done ) | {}\n".format(
3386 socat_cmd
3387 )
3388 )
3389
3390 rnode.run("chmod 755 {}".format(traffic_shell_script))
3391 output = rnode.run("{} &> /dev/null".format(traffic_shell_script))
3392
3393 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3394 return True
3395
3396
3397 def kill_socat(tgen, dut=None, action=None):
3398 """
3399 Killing socat process if running for any router in topology
3400
3401 Parameters:
3402 -----------
3403 * `tgen` : Topogen object
3404 * `dut` : Any iperf hostname to send igmp prune
3405 * `action`: to kill mld join using socat
3406 to kill mld traffic using socat
3407
3408 Usage:
3409 ------
3410 kill_socat(tgen, dut ="i6", action="remove_mld_join")
3411
3412 """
3413
3414 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3415
3416 router_list = tgen.routers()
3417 for router, rnode in router_list.items():
3418 if dut is not None and router != dut:
3419 continue
3420
3421 if action == "remove_mld_join":
3422 cmd = "ps -ef | grep socat | grep UDP6-RECV | grep {}".format(router)
3423 elif action == "remove_mld_traffic":
3424 cmd = "ps -ef | grep socat | grep UDP6-SEND | grep {}".format(router)
3425 else:
3426 cmd = "ps -ef | grep socat".format(router)
3427
3428 awk_cmd = "awk -F' ' '{print $2}' | xargs kill -9 &>/dev/null &"
3429 cmd = "{} | {}".format(cmd, awk_cmd)
3430
3431 logger.debug("[DUT: {}]: Running command: [{}]".format(router, cmd))
3432 rnode.run(cmd)
3433
3434 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3435
3436
3437 #############################################
3438 # Verification APIs
3439 #############################################
3440 @retry(retry_timeout=40)
3441 def verify_rib(
3442 tgen,
3443 addr_type,
3444 dut,
3445 input_dict,
3446 next_hop=None,
3447 protocol=None,
3448 tag=None,
3449 metric=None,
3450 fib=None,
3451 count_only=False,
3452 admin_distance=None,
3453 ):
3454 """
3455 Data will be read from input_dict or input JSON file, API will generate
3456 same prefixes, which were redistributed by either create_static_routes() or
3457 advertise_networks_using_network_command() and do will verify next_hop and
3458 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
3459 command o/p.
3460
3461 Parameters
3462 ----------
3463 * `tgen` : topogen object
3464 * `addr_type` : ip type, ipv4/ipv6
3465 * `dut`: Device Under Test, for which user wants to test the data
3466 * `input_dict` : input dict, has details of static routes
3467 * `next_hop`[optional]: next_hop which needs to be verified,
3468 default: static
3469 * `protocol`[optional]: protocol, default = None
3470 * `count_only`[optional]: count of nexthops only, not specific addresses,
3471 default = False
3472
3473 Usage
3474 -----
3475 # RIB can be verified for static routes OR network advertised using
3476 network command. Following are input_dicts to create static routes
3477 and advertise networks using network command. Any one of the input_dict
3478 can be passed to verify_rib() to verify routes in DUT"s RIB.
3479
3480 # Creating static routes for r1
3481 input_dict = {
3482 "r1": {
3483 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
3484 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
3485 }}
3486 # Advertising networks using network command in router r1
3487 input_dict = {
3488 "r1": {
3489 "advertise_networks": [{"start_ip": "20.0.0.0/32",
3490 "no_of_network": 10},
3491 {"start_ip": "30.0.0.0/32"}]
3492 }}
3493 # Verifying ipv4 routes in router r1 learned via BGP
3494 dut = "r2"
3495 protocol = "bgp"
3496 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
3497
3498 Returns
3499 -------
3500 errormsg(str) or True
3501 """
3502
3503 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3504
3505 router_list = tgen.routers()
3506 additional_nexthops_in_required_nhs = []
3507 found_hops = []
3508 for routerInput in input_dict.keys():
3509 for router, rnode in router_list.items():
3510 if router != dut:
3511 continue
3512
3513 logger.info("Checking router %s RIB:", router)
3514
3515 # Verifying RIB routes
3516 if addr_type == "ipv4":
3517 command = "show ip route"
3518 else:
3519 command = "show ipv6 route"
3520
3521 found_routes = []
3522 missing_routes = []
3523
3524 if "static_routes" in input_dict[routerInput]:
3525 static_routes = input_dict[routerInput]["static_routes"]
3526
3527 for static_route in static_routes:
3528 if "vrf" in static_route and static_route["vrf"] is not None:
3529
3530 logger.info(
3531 "[DUT: {}]: Verifying routes for VRF:"
3532 " {}".format(router, static_route["vrf"])
3533 )
3534
3535 cmd = "{} vrf {}".format(command, static_route["vrf"])
3536
3537 else:
3538 cmd = "{}".format(command)
3539
3540 if protocol:
3541 cmd = "{} {}".format(cmd, protocol)
3542
3543 cmd = "{} json".format(cmd)
3544
3545 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3546
3547 # Verifying output dictionary rib_routes_json is not empty
3548 if bool(rib_routes_json) is False:
3549 errormsg = "No route found in rib of router {}..".format(router)
3550 return errormsg
3551
3552 network = static_route["network"]
3553 if "no_of_ip" in static_route:
3554 no_of_ip = static_route["no_of_ip"]
3555 else:
3556 no_of_ip = 1
3557
3558 if "tag" in static_route:
3559 _tag = static_route["tag"]
3560 else:
3561 _tag = None
3562
3563 # Generating IPs for verification
3564 ip_list = generate_ips(network, no_of_ip)
3565 st_found = False
3566 nh_found = False
3567
3568 for st_rt in ip_list:
3569 st_rt = str(
3570 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
3571 )
3572 _addr_type = validate_ip_address(st_rt)
3573 if _addr_type != addr_type:
3574 continue
3575
3576 if st_rt in rib_routes_json:
3577 st_found = True
3578 found_routes.append(st_rt)
3579
3580 if "queued" in rib_routes_json[st_rt][0]:
3581 errormsg = "Route {} is queued\n".format(st_rt)
3582 return errormsg
3583
3584 if fib and next_hop:
3585 if type(next_hop) is not list:
3586 next_hop = [next_hop]
3587
3588 for mnh in range(0, len(rib_routes_json[st_rt])):
3589 if not "selected" in rib_routes_json[st_rt][mnh]:
3590 continue
3591
3592 if (
3593 "fib"
3594 in rib_routes_json[st_rt][mnh]["nexthops"][0]
3595 ):
3596 found_hops.append(
3597 [
3598 rib_r["ip"]
3599 for rib_r in rib_routes_json[st_rt][
3600 mnh
3601 ]["nexthops"]
3602 ]
3603 )
3604
3605 if found_hops[0]:
3606 missing_list_of_nexthops = set(
3607 found_hops[0]
3608 ).difference(next_hop)
3609 additional_nexthops_in_required_nhs = set(
3610 next_hop
3611 ).difference(found_hops[0])
3612
3613 if additional_nexthops_in_required_nhs:
3614 logger.info(
3615 "Nexthop "
3616 "%s is not active for route %s in "
3617 "RIB of router %s\n",
3618 additional_nexthops_in_required_nhs,
3619 st_rt,
3620 dut,
3621 )
3622 errormsg = (
3623 "Nexthop {} is not active"
3624 " for route {} in RIB of router"
3625 " {}\n".format(
3626 additional_nexthops_in_required_nhs,
3627 st_rt,
3628 dut,
3629 )
3630 )
3631 return errormsg
3632 else:
3633 nh_found = True
3634
3635 elif next_hop and fib is None:
3636 if type(next_hop) is not list:
3637 next_hop = [next_hop]
3638 found_hops = [
3639 rib_r["ip"]
3640 for rib_r in rib_routes_json[st_rt][0]["nexthops"]
3641 if "ip" in rib_r
3642 ]
3643
3644 # If somehow key "ip" is not found in nexthops JSON
3645 # then found_hops would be 0, this particular
3646 # situation will be handled here
3647 if not len(found_hops):
3648 errormsg = (
3649 "Nexthop {} is Missing for "
3650 "route {} in RIB of router {}\n".format(
3651 next_hop,
3652 st_rt,
3653 dut,
3654 )
3655 )
3656 return errormsg
3657
3658 # Check only the count of nexthops
3659 if count_only:
3660 if len(next_hop) == len(found_hops):
3661 nh_found = True
3662 else:
3663 errormsg = (
3664 "Nexthops are missing for "
3665 "route {} in RIB of router {}: "
3666 "expected {}, found {}\n".format(
3667 st_rt,
3668 dut,
3669 len(next_hop),
3670 len(found_hops),
3671 )
3672 )
3673 return errormsg
3674
3675 # Check the actual nexthops
3676 elif found_hops:
3677 missing_list_of_nexthops = set(
3678 found_hops
3679 ).difference(next_hop)
3680 additional_nexthops_in_required_nhs = set(
3681 next_hop
3682 ).difference(found_hops)
3683
3684 if additional_nexthops_in_required_nhs:
3685 logger.info(
3686 "Missing nexthop %s for route"
3687 " %s in RIB of router %s\n",
3688 additional_nexthops_in_required_nhs,
3689 st_rt,
3690 dut,
3691 )
3692 errormsg = (
3693 "Nexthop {} is Missing for "
3694 "route {} in RIB of router {}\n".format(
3695 additional_nexthops_in_required_nhs,
3696 st_rt,
3697 dut,
3698 )
3699 )
3700 return errormsg
3701 else:
3702 nh_found = True
3703
3704 if tag:
3705 if "tag" not in rib_routes_json[st_rt][0]:
3706 errormsg = (
3707 "[DUT: {}]: tag is not"
3708 " present for"
3709 " route {} in RIB \n".format(dut, st_rt)
3710 )
3711 return errormsg
3712
3713 if _tag != rib_routes_json[st_rt][0]["tag"]:
3714 errormsg = (
3715 "[DUT: {}]: tag value {}"
3716 " is not matched for"
3717 " route {} in RIB \n".format(
3718 dut,
3719 _tag,
3720 st_rt,
3721 )
3722 )
3723 return errormsg
3724
3725 if admin_distance is not None:
3726 if "distance" not in rib_routes_json[st_rt][0]:
3727 errormsg = (
3728 "[DUT: {}]: admin distance is"
3729 " not present for"
3730 " route {} in RIB \n".format(dut, st_rt)
3731 )
3732 return errormsg
3733
3734 if (
3735 admin_distance
3736 != rib_routes_json[st_rt][0]["distance"]
3737 ):
3738 errormsg = (
3739 "[DUT: {}]: admin distance value "
3740 "{} is not matched for "
3741 "route {} in RIB \n".format(
3742 dut,
3743 admin_distance,
3744 st_rt,
3745 )
3746 )
3747 return errormsg
3748
3749 if metric is not None:
3750 if "metric" not in rib_routes_json[st_rt][0]:
3751 errormsg = (
3752 "[DUT: {}]: metric is"
3753 " not present for"
3754 " route {} in RIB \n".format(dut, st_rt)
3755 )
3756 return errormsg
3757
3758 if metric != rib_routes_json[st_rt][0]["metric"]:
3759 errormsg = (
3760 "[DUT: {}]: metric value "
3761 "{} is not matched for "
3762 "route {} in RIB \n".format(
3763 dut,
3764 metric,
3765 st_rt,
3766 )
3767 )
3768 return errormsg
3769
3770 else:
3771 missing_routes.append(st_rt)
3772
3773 if nh_found:
3774 logger.info(
3775 "[DUT: {}]: Found next_hop {} for"
3776 " RIB routes: {}".format(router, next_hop, found_routes)
3777 )
3778
3779 if len(missing_routes) > 0:
3780 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
3781 dut, missing_routes
3782 )
3783 return errormsg
3784
3785 if found_routes:
3786 logger.info(
3787 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
3788 dut,
3789 found_routes,
3790 )
3791
3792 continue
3793
3794 if "bgp" in input_dict[routerInput]:
3795 if (
3796 "advertise_networks"
3797 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
3798 "unicast"
3799 ]
3800 ):
3801 continue
3802
3803 found_routes = []
3804 missing_routes = []
3805 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
3806 addr_type
3807 ]["unicast"]["advertise_networks"]
3808
3809 # Continue if there are no network advertise
3810 if len(advertise_network) == 0:
3811 continue
3812
3813 for advertise_network_dict in advertise_network:
3814 if "vrf" in advertise_network_dict:
3815 cmd = "{} vrf {} json".format(
3816 command, advertise_network_dict["vrf"]
3817 )
3818 else:
3819 cmd = "{} json".format(command)
3820
3821 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3822
3823 # Verifying output dictionary rib_routes_json is not empty
3824 if bool(rib_routes_json) is False:
3825 errormsg = "No route found in rib of router {}..".format(router)
3826 return errormsg
3827
3828 start_ip = advertise_network_dict["network"]
3829 if "no_of_network" in advertise_network_dict:
3830 no_of_network = advertise_network_dict["no_of_network"]
3831 else:
3832 no_of_network = 1
3833
3834 # Generating IPs for verification
3835 ip_list = generate_ips(start_ip, no_of_network)
3836 st_found = False
3837 nh_found = False
3838
3839 for st_rt in ip_list:
3840 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
3841
3842 _addr_type = validate_ip_address(st_rt)
3843 if _addr_type != addr_type:
3844 continue
3845
3846 if st_rt in rib_routes_json:
3847 st_found = True
3848 found_routes.append(st_rt)
3849
3850 if "queued" in rib_routes_json[st_rt][0]:
3851 errormsg = "Route {} is queued\n".format(st_rt)
3852 return errormsg
3853
3854 if next_hop:
3855 if type(next_hop) is not list:
3856 next_hop = [next_hop]
3857
3858 count = 0
3859 for nh in next_hop:
3860 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
3861 if nh_dict["ip"] != nh:
3862 continue
3863 else:
3864 count += 1
3865
3866 if count == len(next_hop):
3867 nh_found = True
3868 else:
3869 errormsg = (
3870 "Nexthop {} is Missing"
3871 " for route {} in "
3872 "RIB of router {}\n".format(next_hop, st_rt, dut)
3873 )
3874 return errormsg
3875 else:
3876 missing_routes.append(st_rt)
3877
3878 if nh_found:
3879 logger.info(
3880 "Found next_hop {} for all routes in RIB"
3881 " of router {}\n".format(next_hop, dut)
3882 )
3883
3884 if len(missing_routes) > 0:
3885 errormsg = (
3886 "Missing {} route in RIB of router {}, "
3887 "routes: {} \n".format(addr_type, dut, missing_routes)
3888 )
3889 return errormsg
3890
3891 if found_routes:
3892 logger.info(
3893 "Verified {} routes in router {} RIB, found"
3894 " routes are: {}\n".format(addr_type, dut, found_routes)
3895 )
3896
3897 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3898 return True
3899
3900
3901 @retry(retry_timeout=12)
3902 def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
3903 """
3904 Data will be read from input_dict or input JSON file, API will generate
3905 same prefixes, which were redistributed by either create_static_routes() or
3906 advertise_networks_using_network_command() and will verify next_hop and
3907 each prefix/routes is present in "show ip/ipv6 fib json"
3908 command o/p.
3909
3910 Parameters
3911 ----------
3912 * `tgen` : topogen object
3913 * `addr_type` : ip type, ipv4/ipv6
3914 * `dut`: Device Under Test, for which user wants to test the data
3915 * `input_dict` : input dict, has details of static routes
3916 * `next_hop`[optional]: next_hop which needs to be verified,
3917 default: static
3918
3919 Usage
3920 -----
3921 input_routes_r1 = {
3922 "r1": {
3923 "static_routes": [{
3924 "network": ["1.1.1.1/32],
3925 "next_hop": "Null0",
3926 "vrf": "RED"
3927 }]
3928 }
3929 }
3930 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
3931
3932 Returns
3933 -------
3934 errormsg(str) or True
3935 """
3936
3937 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3938
3939 router_list = tgen.routers()
3940 if dut not in router_list:
3941 return
3942
3943 for routerInput in input_dict.keys():
3944 # XXX replace with router = dut; rnode = router_list[dut]
3945 for router, rnode in router_list.items():
3946 if router != dut:
3947 continue
3948
3949 logger.info("Checking router %s FIB routes:", router)
3950
3951 # Verifying RIB routes
3952 if addr_type == "ipv4":
3953 command = "show ip fib"
3954 else:
3955 command = "show ipv6 fib"
3956
3957 found_routes = []
3958 missing_routes = []
3959
3960 if protocol:
3961 command = "{} {}".format(command, protocol)
3962
3963 if "static_routes" in input_dict[routerInput]:
3964 static_routes = input_dict[routerInput]["static_routes"]
3965
3966 for static_route in static_routes:
3967 if "vrf" in static_route and static_route["vrf"] is not None:
3968
3969 logger.info(
3970 "[DUT: {}]: Verifying routes for VRF:"
3971 " {}".format(router, static_route["vrf"])
3972 )
3973
3974 cmd = "{} vrf {}".format(command, static_route["vrf"])
3975
3976 else:
3977 cmd = "{}".format(command)
3978
3979 cmd = "{} json".format(cmd)
3980
3981 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3982
3983 # Verifying output dictionary rib_routes_json is not empty
3984 if bool(rib_routes_json) is False:
3985 errormsg = "[DUT: {}]: No route found in fib".format(router)
3986 return errormsg
3987
3988 network = static_route["network"]
3989 if "no_of_ip" in static_route:
3990 no_of_ip = static_route["no_of_ip"]
3991 else:
3992 no_of_ip = 1
3993
3994 # Generating IPs for verification
3995 ip_list = generate_ips(network, no_of_ip)
3996 st_found = False
3997 nh_found = False
3998
3999 for st_rt in ip_list:
4000 st_rt = str(
4001 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
4002 )
4003 _addr_type = validate_ip_address(st_rt)
4004 if _addr_type != addr_type:
4005 continue
4006
4007 if st_rt in rib_routes_json:
4008 st_found = True
4009 found_routes.append(st_rt)
4010
4011 if next_hop:
4012 if type(next_hop) is not list:
4013 next_hop = [next_hop]
4014
4015 count = 0
4016 for nh in next_hop:
4017 for nh_dict in rib_routes_json[st_rt][0][
4018 "nexthops"
4019 ]:
4020 if nh_dict["ip"] != nh:
4021 continue
4022 else:
4023 count += 1
4024
4025 if count == len(next_hop):
4026 nh_found = True
4027 else:
4028 missing_routes.append(st_rt)
4029 errormsg = (
4030 "Nexthop {} is Missing"
4031 " for route {} in "
4032 "RIB of router {}\n".format(
4033 next_hop, st_rt, dut
4034 )
4035 )
4036 return errormsg
4037
4038 else:
4039 missing_routes.append(st_rt)
4040
4041 if len(missing_routes) > 0:
4042 errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
4043 dut, missing_routes
4044 )
4045 return errormsg
4046
4047 if nh_found:
4048 logger.info(
4049 "Found next_hop {} for all routes in RIB"
4050 " of router {}\n".format(next_hop, dut)
4051 )
4052
4053 if found_routes:
4054 logger.info(
4055 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
4056 dut,
4057 found_routes,
4058 )
4059
4060 continue
4061
4062 if "bgp" in input_dict[routerInput]:
4063 if (
4064 "advertise_networks"
4065 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
4066 "unicast"
4067 ]
4068 ):
4069 continue
4070
4071 found_routes = []
4072 missing_routes = []
4073 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
4074 addr_type
4075 ]["unicast"]["advertise_networks"]
4076
4077 # Continue if there are no network advertise
4078 if len(advertise_network) == 0:
4079 continue
4080
4081 for advertise_network_dict in advertise_network:
4082 if "vrf" in advertise_network_dict:
4083 cmd = "{} vrf {} json".format(command, static_route["vrf"])
4084 else:
4085 cmd = "{} json".format(command)
4086
4087 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
4088
4089 # Verifying output dictionary rib_routes_json is not empty
4090 if bool(rib_routes_json) is False:
4091 errormsg = "No route found in rib of router {}..".format(router)
4092 return errormsg
4093
4094 start_ip = advertise_network_dict["network"]
4095 if "no_of_network" in advertise_network_dict:
4096 no_of_network = advertise_network_dict["no_of_network"]
4097 else:
4098 no_of_network = 1
4099
4100 # Generating IPs for verification
4101 ip_list = generate_ips(start_ip, no_of_network)
4102 st_found = False
4103 nh_found = False
4104
4105 for st_rt in ip_list:
4106 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
4107
4108 _addr_type = validate_ip_address(st_rt)
4109 if _addr_type != addr_type:
4110 continue
4111
4112 if st_rt in rib_routes_json:
4113 st_found = True
4114 found_routes.append(st_rt)
4115
4116 if next_hop:
4117 if type(next_hop) is not list:
4118 next_hop = [next_hop]
4119
4120 count = 0
4121 for nh in next_hop:
4122 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
4123 if nh_dict["ip"] != nh:
4124 continue
4125 else:
4126 count += 1
4127
4128 if count == len(next_hop):
4129 nh_found = True
4130 else:
4131 missing_routes.append(st_rt)
4132 errormsg = (
4133 "Nexthop {} is Missing"
4134 " for route {} in "
4135 "RIB of router {}\n".format(next_hop, st_rt, dut)
4136 )
4137 return errormsg
4138 else:
4139 missing_routes.append(st_rt)
4140
4141 if len(missing_routes) > 0:
4142 errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
4143 dut, missing_routes
4144 )
4145 return errormsg
4146
4147 if nh_found:
4148 logger.info(
4149 "Found next_hop {} for all routes in RIB"
4150 " of router {}\n".format(next_hop, dut)
4151 )
4152
4153 if found_routes:
4154 logger.info(
4155 "[DUT: {}]: Verified routes FIB"
4156 ", found routes are: {}\n".format(dut, found_routes)
4157 )
4158
4159 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4160 return True
4161
4162
4163 def verify_admin_distance_for_static_routes(tgen, input_dict):
4164 """
4165 API to verify admin distance for static routes as defined in input_dict/
4166 input JSON by running show ip/ipv6 route json command.
4167 Parameter
4168 ---------
4169 * `tgen` : topogen object
4170 * `input_dict`: having details like - for which router and static routes
4171 admin dsitance needs to be verified
4172 Usage
4173 -----
4174 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
4175 10.0.0.2 in router r1
4176 input_dict = {
4177 "r1": {
4178 "static_routes": [{
4179 "network": "10.0.20.1/32",
4180 "admin_distance": 10,
4181 "next_hop": "10.0.0.2"
4182 }]
4183 }
4184 }
4185 result = verify_admin_distance_for_static_routes(tgen, input_dict)
4186 Returns
4187 -------
4188 errormsg(str) or True
4189 """
4190
4191 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4192
4193 router_list = tgen.routers()
4194 for router in input_dict.keys():
4195 if router not in router_list:
4196 continue
4197 rnode = router_list[router]
4198
4199 for static_route in input_dict[router]["static_routes"]:
4200 addr_type = validate_ip_address(static_route["network"])
4201 # Command to execute
4202 if addr_type == "ipv4":
4203 command = "show ip route json"
4204 else:
4205 command = "show ipv6 route json"
4206 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
4207
4208 logger.info(
4209 "Verifying admin distance for static route %s" " under dut %s:",
4210 static_route,
4211 router,
4212 )
4213 network = static_route["network"]
4214 next_hop = static_route["next_hop"]
4215 admin_distance = static_route["admin_distance"]
4216 route_data = show_ip_route_json[network][0]
4217 if network in show_ip_route_json:
4218 if route_data["nexthops"][0]["ip"] == next_hop:
4219 if route_data["distance"] != admin_distance:
4220 errormsg = (
4221 "Verification failed: admin distance"
4222 " for static route {} under dut {},"
4223 " found:{} but expected:{}".format(
4224 static_route,
4225 router,
4226 route_data["distance"],
4227 admin_distance,
4228 )
4229 )
4230 return errormsg
4231 else:
4232 logger.info(
4233 "Verification successful: admin"
4234 " distance for static route %s under"
4235 " dut %s, found:%s",
4236 static_route,
4237 router,
4238 route_data["distance"],
4239 )
4240
4241 else:
4242 errormsg = (
4243 "Static route {} not found in "
4244 "show_ip_route_json for dut {}".format(network, router)
4245 )
4246 return errormsg
4247
4248 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4249 return True
4250
4251
4252 def verify_prefix_lists(tgen, input_dict):
4253 """
4254 Running "show ip prefix-list" command and verifying given prefix-list
4255 is present in router.
4256 Parameters
4257 ----------
4258 * `tgen` : topogen object
4259 * `input_dict`: data to verify prefix lists
4260 Usage
4261 -----
4262 # To verify pf_list_1 is present in router r1
4263 input_dict = {
4264 "r1": {
4265 "prefix_lists": ["pf_list_1"]
4266 }}
4267 result = verify_prefix_lists("ipv4", input_dict, tgen)
4268 Returns
4269 -------
4270 errormsg(str) or True
4271 """
4272
4273 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4274
4275 router_list = tgen.routers()
4276 for router in input_dict.keys():
4277 if router not in router_list:
4278 continue
4279
4280 rnode = router_list[router]
4281
4282 # Show ip prefix list
4283 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
4284
4285 # Verify Prefix list is deleted
4286 prefix_lists_addr = input_dict[router]["prefix_lists"]
4287 for addr_type in prefix_lists_addr:
4288 if not check_address_types(addr_type):
4289 continue
4290 # show ip prefix list
4291 if addr_type == "ipv4":
4292 cmd = "show ip prefix-list"
4293 else:
4294 cmd = "show {} prefix-list".format(addr_type)
4295 show_prefix_list = run_frr_cmd(rnode, cmd)
4296 for prefix_list in prefix_lists_addr[addr_type].keys():
4297 if prefix_list in show_prefix_list:
4298 errormsg = (
4299 "Prefix list {} is/are present in the router"
4300 " {}".format(prefix_list, router)
4301 )
4302 return errormsg
4303
4304 logger.info(
4305 "Prefix list %s is/are not present in the router" " from router %s",
4306 prefix_list,
4307 router,
4308 )
4309
4310 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4311 return True
4312
4313
4314 @retry(retry_timeout=12)
4315 def verify_route_maps(tgen, input_dict):
4316 """
4317 Running "show route-map" command and verifying given route-map
4318 is present in router.
4319 Parameters
4320 ----------
4321 * `tgen` : topogen object
4322 * `input_dict`: data to verify prefix lists
4323 Usage
4324 -----
4325 # To verify rmap_1 and rmap_2 are present in router r1
4326 input_dict = {
4327 "r1": {
4328 "route_maps": ["rmap_1", "rmap_2"]
4329 }
4330 }
4331 result = verify_route_maps(tgen, input_dict)
4332 Returns
4333 -------
4334 errormsg(str) or True
4335 """
4336
4337 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4338
4339 router_list = tgen.routers()
4340 for router in input_dict.keys():
4341 if router not in router_list:
4342 continue
4343
4344 rnode = router_list[router]
4345 # Show ip route-map
4346 show_route_maps = rnode.vtysh_cmd("show route-map")
4347
4348 # Verify route-map is deleted
4349 route_maps = input_dict[router]["route_maps"]
4350 for route_map in route_maps:
4351 if route_map in show_route_maps:
4352 errormsg = "Route map {} is not deleted from router" " {}".format(
4353 route_map, router
4354 )
4355 return errormsg
4356
4357 logger.info(
4358 "Route map %s is/are deleted successfully from" " router %s",
4359 route_maps,
4360 router,
4361 )
4362
4363 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4364 return True
4365
4366
4367 @retry(retry_timeout=16)
4368 def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
4369 """
4370 API to veiryf BGP large community is attached in route for any given
4371 DUT by running "show bgp ipv4/6 {route address} json" command.
4372 Parameters
4373 ----------
4374 * `tgen`: topogen object
4375 * `addr_type` : ip type, ipv4/ipv6
4376 * `dut`: Device Under Test
4377 * `network`: network for which set criteria needs to be verified
4378 * `input_dict`: having details like - for which router, community and
4379 values needs to be verified
4380 Usage
4381 -----
4382 networks = ["200.50.2.0/32"]
4383 input_dict = {
4384 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
4385 }
4386 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
4387 Returns
4388 -------
4389 errormsg(str) or True
4390 """
4391
4392 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4393 router_list = tgen.routers()
4394 if router not in router_list:
4395 return False
4396
4397 rnode = router_list[router]
4398
4399 logger.debug(
4400 "Verifying BGP community attributes on dut %s: for %s " "network %s",
4401 router,
4402 addr_type,
4403 network,
4404 )
4405
4406 for net in network:
4407 cmd = "show bgp {} {} json".format(addr_type, net)
4408 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
4409 logger.info(show_bgp_json)
4410 if "paths" not in show_bgp_json:
4411 return "Prefix {} not found in BGP table of router: {}".format(net, router)
4412
4413 as_paths = show_bgp_json["paths"]
4414 found = False
4415 for i in range(len(as_paths)):
4416 if (
4417 "largeCommunity" in show_bgp_json["paths"][i]
4418 or "community" in show_bgp_json["paths"][i]
4419 ):
4420 found = True
4421 logger.info(
4422 "Large Community attribute is found for route:" " %s in router: %s",
4423 net,
4424 router,
4425 )
4426 if input_dict is not None:
4427 for criteria, comm_val in input_dict.items():
4428 show_val = show_bgp_json["paths"][i][criteria]["string"]
4429 if comm_val == show_val:
4430 logger.info(
4431 "Verifying BGP %s for prefix: %s"
4432 " in router: %s, found expected"
4433 " value: %s",
4434 criteria,
4435 net,
4436 router,
4437 comm_val,
4438 )
4439 else:
4440 errormsg = (
4441 "Failed: Verifying BGP attribute"
4442 " {} for route: {} in router: {}"
4443 ", expected value: {} but found"
4444 ": {}".format(criteria, net, router, comm_val, show_val)
4445 )
4446 return errormsg
4447
4448 if not found:
4449 errormsg = (
4450 "Large Community attribute is not found for route: "
4451 "{} in router: {} ".format(net, router)
4452 )
4453 return errormsg
4454
4455 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4456 return True
4457
4458
4459 def get_ipv6_linklocal_address(topo, node, intf):
4460 """
4461 API to get the link local ipv6 address of a particular interface
4462
4463 Parameters
4464 ----------
4465 * `node`: node on which link local ip to be fetched.
4466 * `intf` : interface for which link local ip needs to be returned.
4467 * `topo` : base topo
4468
4469 Usage
4470 -----
4471 result = get_ipv6_linklocal_address(topo, 'r1', 'r2')
4472
4473 Returns link local ip of interface between r1 and r2.
4474
4475 Returns
4476 -------
4477 1) link local ipv6 address from the interface
4478 2) errormsg - when link local ip not found
4479 """
4480 tgen = get_topogen()
4481 ext_nh = tgen.net[node].get_ipv6_linklocal()
4482 req_nh = topo[node]["links"][intf]["interface"]
4483 llip = None
4484 for llips in ext_nh:
4485 if llips[0] == req_nh:
4486 llip = llips[1]
4487 logger.info("Link local ip found = %s", llip)
4488 return llip
4489
4490 errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format(
4491 node, intf
4492 )
4493
4494 return errormsg
4495
4496
4497 def verify_create_community_list(tgen, input_dict):
4498 """
4499 API is to verify if large community list is created for any given DUT in
4500 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
4501 command.
4502 Parameters
4503 ----------
4504 * `tgen`: topogen object
4505 * `input_dict`: having details like - for which router, large community
4506 needs to be verified
4507 Usage
4508 -----
4509 input_dict = {
4510 "r1": {
4511 "large-community-list": {
4512 "standard": {
4513 "Test1": [{"action": "PERMIT", "attribute":\
4514 ""}]
4515 }}}}
4516 result = verify_create_community_list(tgen, input_dict)
4517 Returns
4518 -------
4519 errormsg(str) or True
4520 """
4521
4522 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4523
4524 router_list = tgen.routers()
4525 for router in input_dict.keys():
4526 if router not in router_list:
4527 continue
4528
4529 rnode = router_list[router]
4530
4531 logger.info("Verifying large-community is created for dut %s:", router)
4532
4533 for comm_data in input_dict[router]["bgp_community_lists"]:
4534 comm_name = comm_data["name"]
4535 comm_type = comm_data["community_type"]
4536 show_bgp_community = run_frr_cmd(
4537 rnode, "show bgp large-community-list {} detail".format(comm_name)
4538 )
4539
4540 # Verify community list and type
4541 if comm_name in show_bgp_community and comm_type in show_bgp_community:
4542 logger.info(
4543 "BGP %s large-community-list %s is" " created", comm_type, comm_name
4544 )
4545 else:
4546 errormsg = "BGP {} large-community-list {} is not" " created".format(
4547 comm_type, comm_name
4548 )
4549 return errormsg
4550
4551 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4552 return True
4553
4554
4555 def verify_cli_json(tgen, input_dict):
4556 """
4557 API to verify if JSON is available for clis
4558 command.
4559 Parameters
4560 ----------
4561 * `tgen`: topogen object
4562 * `input_dict`: CLIs for which JSON needs to be verified
4563 Usage
4564 -----
4565 input_dict = {
4566 "edge1":{
4567 "cli": ["show evpn vni detail", show evpn rmac vni all]
4568 }
4569 }
4570
4571 result = verify_cli_json(tgen, input_dict)
4572
4573 Returns
4574 -------
4575 errormsg(str) or True
4576 """
4577
4578 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4579 for dut in input_dict.keys():
4580 rnode = tgen.gears[dut]
4581
4582 for cli in input_dict[dut]["cli"]:
4583 logger.info(
4584 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli
4585 )
4586
4587 test_cli = "{} json".format(cli)
4588 ret_json = rnode.vtysh_cmd(test_cli, isjson=True)
4589 if not bool(ret_json):
4590 errormsg = "CLI: %s, JSON format is not available" % (cli)
4591 return errormsg
4592 elif "unknown" in ret_json or "Unknown" in ret_json:
4593 errormsg = "CLI: %s, JSON format is not available" % (cli)
4594 return errormsg
4595 else:
4596 logger.info(
4597 "CLI : %s JSON format is available: " "\n %s", cli, ret_json
4598 )
4599
4600 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4601
4602 return True
4603
4604
4605 @retry(retry_timeout=12)
4606 def verify_evpn_vni(tgen, input_dict):
4607 """
4608 API to verify evpn vni details using "show evpn vni detail json"
4609 command.
4610
4611 Parameters
4612 ----------
4613 * `tgen`: topogen object
4614 * `input_dict`: having details like - for which router, evpn details
4615 needs to be verified
4616 Usage
4617 -----
4618 input_dict = {
4619 "edge1":{
4620 "vni": [
4621 {
4622 "75100":{
4623 "vrf": "RED",
4624 "vxlanIntf": "vxlan75100",
4625 "localVtepIp": "120.1.1.1",
4626 "sviIntf": "br100"
4627 }
4628 }
4629 ]
4630 }
4631 }
4632
4633 result = verify_evpn_vni(tgen, input_dict)
4634
4635 Returns
4636 -------
4637 errormsg(str) or True
4638 """
4639
4640 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4641 for dut in input_dict.keys():
4642 rnode = tgen.gears[dut]
4643
4644 logger.info("[DUT: %s]: Verifying evpn vni details :", dut)
4645
4646 cmd = "show evpn vni detail json"
4647 evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4648 if not bool(evpn_all_vni_json):
4649 errormsg = "No output for '{}' cli".format(cmd)
4650 return errormsg
4651
4652 if "vni" in input_dict[dut]:
4653 for vni_dict in input_dict[dut]["vni"]:
4654 found = False
4655 vni = vni_dict["name"]
4656 for evpn_vni_json in evpn_all_vni_json:
4657 if "vni" in evpn_vni_json:
4658 if evpn_vni_json["vni"] != int(vni):
4659 continue
4660
4661 for attribute in vni_dict.keys():
4662 if vni_dict[attribute] != evpn_vni_json[attribute]:
4663 errormsg = (
4664 "[DUT: %s] Verifying "
4665 "%s for VNI: %s [FAILED]||"
4666 ", EXPECTED : %s "
4667 " FOUND : %s"
4668 % (
4669 dut,
4670 attribute,
4671 vni,
4672 vni_dict[attribute],
4673 evpn_vni_json[attribute],
4674 )
4675 )
4676 return errormsg
4677
4678 else:
4679 found = True
4680 logger.info(
4681 "[DUT: %s] Verifying"
4682 " %s for VNI: %s , "
4683 "Found Expected : %s ",
4684 dut,
4685 attribute,
4686 vni,
4687 evpn_vni_json[attribute],
4688 )
4689
4690 if evpn_vni_json["state"] != "Up":
4691 errormsg = (
4692 "[DUT: %s] Failed: Verifying"
4693 " State for VNI: %s is not Up" % (dut, vni)
4694 )
4695 return errormsg
4696
4697 else:
4698 errormsg = (
4699 "[DUT: %s] Failed:"
4700 " VNI: %s is not present in JSON" % (dut, vni)
4701 )
4702 return errormsg
4703
4704 if found:
4705 logger.info(
4706 "[DUT %s]: Verifying VNI : %s "
4707 "details and state is Up [PASSED]!!",
4708 dut,
4709 vni,
4710 )
4711 return True
4712
4713 else:
4714 errormsg = (
4715 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut)
4716 )
4717 return errormsg
4718
4719 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4720 return False
4721
4722
4723 @retry(retry_timeout=12)
4724 def verify_vrf_vni(tgen, input_dict):
4725 """
4726 API to verify vrf vni details using "show vrf vni json"
4727 command.
4728 Parameters
4729 ----------
4730 * `tgen`: topogen object
4731 * `input_dict`: having details like - for which router, evpn details
4732 needs to be verified
4733 Usage
4734 -----
4735 input_dict = {
4736 "edge1":{
4737 "vrfs": [
4738 {
4739 "RED":{
4740 "vni": 75000,
4741 "vxlanIntf": "vxlan75100",
4742 "sviIntf": "br100",
4743 "routerMac": "00:80:48:ba:d1:00",
4744 "state": "Up"
4745 }
4746 }
4747 ]
4748 }
4749 }
4750
4751 result = verify_vrf_vni(tgen, input_dict)
4752
4753 Returns
4754 -------
4755 errormsg(str) or True
4756 """
4757
4758 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4759 for dut in input_dict.keys():
4760 rnode = tgen.gears[dut]
4761
4762 logger.info("[DUT: %s]: Verifying vrf vni details :", dut)
4763
4764 cmd = "show vrf vni json"
4765 vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4766 if not bool(vrf_all_vni_json):
4767 errormsg = "No output for '{}' cli".format(cmd)
4768 return errormsg
4769
4770 if "vrfs" in input_dict[dut]:
4771 for vrfs in input_dict[dut]["vrfs"]:
4772 for vrf, vrf_dict in vrfs.items():
4773 found = False
4774 for vrf_vni_json in vrf_all_vni_json["vrfs"]:
4775 if "vrf" in vrf_vni_json:
4776 if vrf_vni_json["vrf"] != vrf:
4777 continue
4778
4779 for attribute in vrf_dict.keys():
4780 if vrf_dict[attribute] == vrf_vni_json[attribute]:
4781 found = True
4782 logger.info(
4783 "[DUT %s]: VRF: %s, "
4784 "verifying %s "
4785 ", Found Expected: %s "
4786 "[PASSED]!!",
4787 dut,
4788 vrf,
4789 attribute,
4790 vrf_vni_json[attribute],
4791 )
4792 else:
4793 errormsg = (
4794 "[DUT: %s] VRF: %s, "
4795 "verifying %s [FAILED!!] "
4796 ", EXPECTED : %s "
4797 ", FOUND : %s"
4798 % (
4799 dut,
4800 vrf,
4801 attribute,
4802 vrf_dict[attribute],
4803 vrf_vni_json[attribute],
4804 )
4805 )
4806 return errormsg
4807
4808 else:
4809 errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % (
4810 dut,
4811 vrf,
4812 )
4813 return errormsg
4814
4815 if found:
4816 logger.info(
4817 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
4818 dut,
4819 vrf,
4820 )
4821 return True
4822
4823 else:
4824 errormsg = (
4825 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut)
4826 )
4827 return errormsg
4828
4829 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4830 return False
4831
4832
4833 def required_linux_kernel_version(required_version):
4834 """
4835 This API is used to check linux version compatibility of the test suite.
4836 If version mentioned in required_version is higher than the linux kernel
4837 of the system, test suite will be skipped. This API returns true or errormsg.
4838
4839 Parameters
4840 ----------
4841 * `required_version` : Kernel version required for the suites to run.
4842
4843 Usage
4844 -----
4845 result = linux_kernel_version_lowerthan('4.15')
4846
4847 Returns
4848 -------
4849 errormsg(str) or True
4850 """
4851 system_kernel = platform.release()
4852 if version_cmp(system_kernel, required_version) < 0:
4853 error_msg = (
4854 'These tests will not run on kernel "{}", '
4855 "they require kernel >= {})".format(system_kernel, required_version)
4856 )
4857
4858 logger.info(error_msg)
4859
4860 return error_msg
4861 return True
4862
4863
4864 class HostApplicationHelper(object):
4865 """Helper to track and cleanup per-host based test processes."""
4866
4867 def __init__(self, tgen=None, base_cmd=None):
4868 self.base_cmd_str = ""
4869 self.host_procs = {}
4870 self.tgen = None
4871 self.set_base_cmd(base_cmd if base_cmd else [])
4872 if tgen is not None:
4873 self.init(tgen)
4874
4875 def __enter__(self):
4876 self.init()
4877 return self
4878
4879 def __exit__(self, type, value, traceback):
4880 self.cleanup()
4881
4882 def __str__(self):
4883 return "HostApplicationHelper({})".format(self.base_cmd_str)
4884
4885 def set_base_cmd(self, base_cmd):
4886 assert isinstance(base_cmd, list) or isinstance(base_cmd, tuple)
4887 self.base_cmd = base_cmd
4888 if base_cmd:
4889 self.base_cmd_str = " ".join(base_cmd)
4890 else:
4891 self.base_cmd_str = ""
4892
4893 def init(self, tgen=None):
4894 """Initialize the helper with tgen if needed.
4895
4896 If overridden, need to handle multiple entries but one init. Will be called on
4897 object creation if tgen is supplied. Will be called again on __enter__ so should
4898 not re-init if already inited.
4899 """
4900 if self.tgen:
4901 assert tgen is None or self.tgen == tgen
4902 else:
4903 self.tgen = tgen
4904
4905 def started_proc(self, host, p):
4906 """Called after process started on host.
4907
4908 Return value is passed to `stopping_proc` method."""
4909 logger.debug("%s: Doing nothing after starting process", self)
4910 return False
4911
4912 def stopping_proc(self, host, p, info):
4913 """Called after process started on host."""
4914 logger.debug("%s: Doing nothing before stopping process", self)
4915
4916 def _add_host_proc(self, host, p):
4917 v = self.started_proc(host, p)
4918
4919 if host not in self.host_procs:
4920 self.host_procs[host] = []
4921 logger.debug("%s: %s: tracking process %s", self, host, p)
4922 self.host_procs[host].append((p, v))
4923
4924 def stop_host(self, host):
4925 """Stop the process on the host.
4926
4927 Override to do additional cleanup."""
4928 if host in self.host_procs:
4929 hlogger = self.tgen.net[host].logger
4930 for p, v in self.host_procs[host]:
4931 self.stopping_proc(host, p, v)
4932 logger.debug("%s: %s: terminating process %s", self, host, p.pid)
4933 hlogger.debug("%s: %s: terminating process %s", self, host, p.pid)
4934 rc = p.poll()
4935 if rc is not None:
4936 logger.error(
4937 "%s: %s: process early exit %s: %s",
4938 self,
4939 host,
4940 p.pid,
4941 comm_error(p),
4942 )
4943 hlogger.error(
4944 "%s: %s: process early exit %s: %s",
4945 self,
4946 host,
4947 p.pid,
4948 comm_error(p),
4949 )
4950 else:
4951 p.terminate()
4952 p.wait()
4953 logger.debug(
4954 "%s: %s: terminated process %s: %s",
4955 self,
4956 host,
4957 p.pid,
4958 comm_error(p),
4959 )
4960 hlogger.debug(
4961 "%s: %s: terminated process %s: %s",
4962 self,
4963 host,
4964 p.pid,
4965 comm_error(p),
4966 )
4967
4968 del self.host_procs[host]
4969
4970 def stop_all_hosts(self):
4971 hosts = set(self.host_procs)
4972 for host in hosts:
4973 self.stop_host(host)
4974
4975 def cleanup(self):
4976 self.stop_all_hosts()
4977
4978 def run(self, host, cmd_args, **kwargs):
4979 cmd = list(self.base_cmd)
4980 cmd.extend(cmd_args)
4981 p = self.tgen.gears[host].popen(cmd, **kwargs)
4982 assert p.poll() is None
4983 self._add_host_proc(host, p)
4984 return p
4985
4986 def check_procs(self):
4987 """Check that all current processes are running, log errors if not.
4988
4989 Returns: List of stopped processes."""
4990 procs = []
4991
4992 logger.debug("%s: checking procs on hosts %s", self, self.host_procs.keys())
4993
4994 for host in self.host_procs:
4995 hlogger = self.tgen.net[host].logger
4996 for p, _ in self.host_procs[host]:
4997 logger.debug("%s: checking %s proc %s", self, host, p)
4998 rc = p.poll()
4999 if rc is None:
5000 continue
5001 logger.error(
5002 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
5003 )
5004 hlogger.error(
5005 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
5006 )
5007 procs.append(p)
5008 return procs
5009
5010
5011 class IPerfHelper(HostApplicationHelper):
5012 def __str__(self):
5013 return "IPerfHelper()"
5014
5015 def run_join(
5016 self,
5017 host,
5018 join_addr,
5019 l4Type="UDP",
5020 join_interval=1,
5021 join_intf=None,
5022 join_towards=None,
5023 ):
5024 """
5025 Use iperf to send IGMP join and listen to traffic
5026
5027 Parameters:
5028 -----------
5029 * `host`: iperf host from where IGMP join would be sent
5030 * `l4Type`: string, one of [ TCP, UDP ]
5031 * `join_addr`: multicast address (or addresses) to join to
5032 * `join_interval`: seconds between periodic bandwidth reports
5033 * `join_intf`: the interface to bind the join to
5034 * `join_towards`: router whos interface to bind the join to
5035
5036 returns: Success (bool)
5037 """
5038
5039 iperf_path = self.tgen.net.get_exec_path("iperf")
5040
5041 assert join_addr
5042 if not isinstance(join_addr, list) and not isinstance(join_addr, tuple):
5043 join_addr = [ipaddress.IPv4Address(frr_unicode(join_addr))]
5044
5045 for bindTo in join_addr:
5046 iperf_args = [iperf_path, "-s"]
5047
5048 if l4Type == "UDP":
5049 iperf_args.append("-u")
5050
5051 iperf_args.append("-B")
5052 if join_towards:
5053 to_intf = frr_unicode(
5054 self.tgen.json_topo["routers"][host]["links"][join_towards][
5055 "interface"
5056 ]
5057 )
5058 iperf_args.append("{}%{}".format(str(bindTo), to_intf))
5059 elif join_intf:
5060 iperf_args.append("{}%{}".format(str(bindTo), join_intf))
5061 else:
5062 iperf_args.append(str(bindTo))
5063
5064 if join_interval:
5065 iperf_args.append("-i")
5066 iperf_args.append(str(join_interval))
5067
5068 p = self.run(host, iperf_args)
5069 if p.poll() is not None:
5070 logger.error("IGMP join failed on %s: %s", bindTo, comm_error(p))
5071 return False
5072 return True
5073
5074 def run_traffic(
5075 self, host, sentToAddress, ttl, time=0, l4Type="UDP", bind_towards=None
5076 ):
5077 """
5078 Run iperf to send IGMP join and traffic
5079
5080 Parameters:
5081 -----------
5082 * `host`: iperf host to send traffic from
5083 * `l4Type`: string, one of [ TCP, UDP ]
5084 * `sentToAddress`: multicast address to send traffic to
5085 * `ttl`: time to live
5086 * `time`: time in seconds to transmit for
5087 * `bind_towards`: Router who's interface the source ip address is got from
5088
5089 returns: Success (bool)
5090 """
5091
5092 iperf_path = self.tgen.net.get_exec_path("iperf")
5093
5094 if sentToAddress and not isinstance(sentToAddress, list):
5095 sentToAddress = [ipaddress.IPv4Address(frr_unicode(sentToAddress))]
5096
5097 for sendTo in sentToAddress:
5098 iperf_args = [iperf_path, "-c", sendTo]
5099
5100 # Bind to Interface IP
5101 if bind_towards:
5102 ifaddr = frr_unicode(
5103 self.tgen.json_topo["routers"][host]["links"][bind_towards]["ipv4"]
5104 )
5105 ipaddr = ipaddress.IPv4Interface(ifaddr).ip
5106 iperf_args.append("-B")
5107 iperf_args.append(str(ipaddr))
5108
5109 # UDP/TCP
5110 if l4Type == "UDP":
5111 iperf_args.append("-u")
5112 iperf_args.append("-b")
5113 iperf_args.append("0.012m")
5114
5115 # TTL
5116 if ttl:
5117 iperf_args.append("-T")
5118 iperf_args.append(str(ttl))
5119
5120 # Time
5121 if time:
5122 iperf_args.append("-t")
5123 iperf_args.append(str(time))
5124
5125 p = self.run(host, iperf_args)
5126 if p.poll() is not None:
5127 logger.error(
5128 "mcast traffic send failed for %s: %s", sendTo, comm_error(p)
5129 )
5130 return False
5131
5132 return True
5133
5134
5135 def verify_ip_nht(tgen, input_dict):
5136 """
5137 Running "show ip nht" command and verifying given nexthop resolution
5138 Parameters
5139 ----------
5140 * `tgen` : topogen object
5141 * `input_dict`: data to verify nexthop
5142 Usage
5143 -----
5144 input_dict_4 = {
5145 "r1": {
5146 nh: {
5147 "Address": nh,
5148 "resolvedVia": "connected",
5149 "nexthops": {
5150 "nexthop1": {
5151 "Interface": intf
5152 }
5153 }
5154 }
5155 }
5156 }
5157 result = verify_ip_nht(tgen, input_dict_4)
5158 Returns
5159 -------
5160 errormsg(str) or True
5161 """
5162
5163 logger.debug("Entering lib API: verify_ip_nht()")
5164
5165 router_list = tgen.routers()
5166 for router in input_dict.keys():
5167 if router not in router_list:
5168 continue
5169
5170 rnode = router_list[router]
5171 nh_list = input_dict[router]
5172
5173 if validate_ip_address(next(iter(nh_list))) == "ipv6":
5174 show_ip_nht = run_frr_cmd(rnode, "show ipv6 nht")
5175 else:
5176 show_ip_nht = run_frr_cmd(rnode, "show ip nht")
5177
5178 for nh in nh_list:
5179 if nh in show_ip_nht:
5180 nht = run_frr_cmd(rnode, "show ip nht {}".format(nh))
5181 if "unresolved" in nht:
5182 errormsg = "Nexthop {} became unresolved on {}".format(nh, router)
5183 return errormsg
5184 else:
5185 logger.info("Nexthop %s is resolved on %s", nh, router)
5186 return True
5187 else:
5188 errormsg = "Nexthop {} is resolved on {}".format(nh, router)
5189 return errormsg
5190
5191 logger.debug("Exiting lib API: verify_ip_nht()")
5192 return False
5193
5194
5195 def scapy_send_raw_packet(tgen, topo, senderRouter, intf, packet=None):
5196 """
5197 Using scapy Raw() method to send BSR raw packet from one FRR
5198 to other
5199
5200 Parameters:
5201 -----------
5202 * `tgen` : Topogen object
5203 * `topo` : json file data
5204 * `senderRouter` : Sender router
5205 * `packet` : packet in raw format
5206
5207 returns:
5208 --------
5209 errormsg or True
5210 """
5211
5212 global CD
5213 result = ""
5214 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5215 sender_interface = intf
5216 rnode = tgen.routers()[senderRouter]
5217
5218 for destLink, data in topo["routers"][senderRouter]["links"].items():
5219 if "type" in data and data["type"] == "loopback":
5220 continue
5221
5222 if not packet:
5223 packet = topo["routers"][senderRouter]["pkt"]["test_packets"][packet][
5224 "data"
5225 ]
5226
5227 python3_path = tgen.net.get_exec_path(["python3", "python"])
5228 script_path = os.path.join(CD, "send_bsr_packet.py")
5229 cmd = "{} {} '{}' '{}' --interval=1 --count=1".format(
5230 python3_path, script_path, packet, sender_interface
5231 )
5232
5233 logger.info("Scapy cmd: \n %s", cmd)
5234 result = rnode.run(cmd)
5235
5236 if result == "":
5237 return result
5238
5239 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
5240 return True