]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
Merge pull request #12366 from manojvn/ospfv2-flood-reduction
[mirror_frr.git] / tests / topotests / lib / common_config.py
1 # SPDX-License-Identifier: ISC
2 #
3 # Copyright (c) 2019 by VMware, Inc. ("VMware")
4 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
5 # ("NetDEF") in this file.
6 #
7
8 import ipaddress
9 import json
10 import os
11 import platform
12 import socket
13 import subprocess
14 import sys
15 import traceback
16 import functools
17 from collections import OrderedDict
18 from copy import deepcopy
19 from datetime import datetime, timedelta
20 from functools import wraps
21 from re import search as re_search
22 from time import sleep
23
24 try:
25 # Imports from python2
26 import ConfigParser as configparser
27 except ImportError:
28 # Imports from python3
29 import configparser
30
31 from lib.micronet import comm_error
32 from lib.topogen import TopoRouter, get_topogen
33 from lib.topolog import get_logger, logger
34 from lib.topotest import frr_unicode, interface_set_status, version_cmp
35 from lib import topotest
36
37 FRRCFG_FILE = "frr_json.conf"
38 FRRCFG_BKUP_FILE = "frr_json_initial.conf"
39
40 ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
41
42 ####
43 CD = os.path.dirname(os.path.realpath(__file__))
44 PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
45
46 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
47 # in `pytest.ini`.
48 config = configparser.ConfigParser()
49 config.read(PYTESTINI_PATH)
50
51 config_section = "topogen"
52
53 # Debug logs for daemons
54 DEBUG_LOGS = {
55 "pimd": [
56 "debug msdp events",
57 "debug msdp packets",
58 "debug igmp events",
59 "debug igmp trace",
60 "debug mroute",
61 "debug mroute detail",
62 "debug pim events",
63 "debug pim packets",
64 "debug pim trace",
65 "debug pim zebra",
66 "debug pim bsm",
67 "debug pim packets joins",
68 "debug pim packets register",
69 "debug pim nht",
70 ],
71 "pim6d": [
72 "debug pimv6 events",
73 "debug pimv6 packets",
74 "debug pimv6 packet-dump send",
75 "debug pimv6 packet-dump receive",
76 "debug pimv6 trace",
77 "debug pimv6 trace detail",
78 "debug pimv6 zebra",
79 "debug pimv6 bsm",
80 "debug pimv6 packets hello",
81 "debug pimv6 packets joins",
82 "debug pimv6 packets register",
83 "debug pimv6 nht",
84 "debug pimv6 nht detail",
85 "debug mroute6",
86 "debug mroute6 detail",
87 "debug mld events",
88 "debug mld packets",
89 "debug mld trace",
90 ],
91 "bgpd": [
92 "debug bgp neighbor-events",
93 "debug bgp updates",
94 "debug bgp zebra",
95 "debug bgp nht",
96 "debug bgp neighbor-events",
97 "debug bgp graceful-restart",
98 "debug bgp update-groups",
99 "debug bgp vpn leak-from-vrf",
100 "debug bgp vpn leak-to-vrf",
101 "debug bgp zebr",
102 "debug bgp updates",
103 "debug bgp nht",
104 "debug bgp neighbor-events",
105 "debug vrf",
106 ],
107 "zebra": [
108 "debug zebra events",
109 "debug zebra rib",
110 "debug zebra vxlan",
111 "debug zebra nht",
112 ],
113 "ospf": [
114 "debug ospf event",
115 "debug ospf ism",
116 "debug ospf lsa",
117 "debug ospf nsm",
118 "debug ospf nssa",
119 "debug ospf packet all",
120 "debug ospf sr",
121 "debug ospf te",
122 "debug ospf zebra",
123 ],
124 "ospf6": [
125 "debug ospf6 event",
126 "debug ospf6 ism",
127 "debug ospf6 lsa",
128 "debug ospf6 nsm",
129 "debug ospf6 nssa",
130 "debug ospf6 packet all",
131 "debug ospf6 sr",
132 "debug ospf6 te",
133 "debug ospf6 zebra",
134 ],
135 }
136
137 g_iperf_client_procs = {}
138 g_iperf_server_procs = {}
139
140
141 def is_string(value):
142 try:
143 return isinstance(value, basestring)
144 except NameError:
145 return isinstance(value, str)
146
147
148 if config.has_option("topogen", "verbosity"):
149 loglevel = config.get("topogen", "verbosity")
150 loglevel = loglevel.lower()
151 else:
152 loglevel = "info"
153
154 if config.has_option("topogen", "frrtest_log_dir"):
155 frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
156 time_stamp = datetime.time(datetime.now())
157 logfile_name = "frr_test_bgp_"
158 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
159 print("frrtest_log_file..", frrtest_log_file)
160
161 logger = get_logger(
162 "test_execution_logs", log_level=loglevel, target=frrtest_log_file
163 )
164 print("Logs will be sent to logfile: {}".format(frrtest_log_file))
165
166 if config.has_option("topogen", "show_router_config"):
167 show_router_config = config.get("topogen", "show_router_config")
168 else:
169 show_router_config = False
170
171 # env variable for setting what address type to test
172 ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
173
174
175 # Saves sequence id numbers
176 SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
177
178
179 def get_seq_id(obj_type, router, obj_name):
180 """
181 Generates and saves sequence number in interval of 10
182 Parameters
183 ----------
184 * `obj_type`: prefix_lists or route_maps
185 * `router`: router name
186 *` obj_name`: name of the prefix-list or route-map
187 Returns
188 --------
189 Sequence number generated
190 """
191
192 router_data = SEQ_ID[obj_type].setdefault(router, {})
193 obj_data = router_data.setdefault(obj_name, {})
194 seq_id = obj_data.setdefault("seq_id", 0)
195
196 seq_id = int(seq_id) + 10
197 obj_data["seq_id"] = seq_id
198
199 return seq_id
200
201
202 def set_seq_id(obj_type, router, id, obj_name):
203 """
204 Saves sequence number if not auto-generated and given by user
205 Parameters
206 ----------
207 * `obj_type`: prefix_lists or route_maps
208 * `router`: router name
209 *` obj_name`: name of the prefix-list or route-map
210 """
211 router_data = SEQ_ID[obj_type].setdefault(router, {})
212 obj_data = router_data.setdefault(obj_name, {})
213 seq_id = obj_data.setdefault("seq_id", 0)
214
215 seq_id = int(seq_id) + int(id)
216 obj_data["seq_id"] = seq_id
217
218
219 class InvalidCLIError(Exception):
220 """Raise when the CLI command is wrong"""
221
222
223 def run_frr_cmd(rnode, cmd, isjson=False):
224 """
225 Execute frr show commands in privileged mode
226 * `rnode`: router node on which command needs to be executed
227 * `cmd`: Command to be executed on frr
228 * `isjson`: If command is to get json data or not
229 :return str:
230 """
231
232 if cmd:
233 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
234
235 if isjson:
236 rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
237
238 return ret_data
239
240 else:
241 raise InvalidCLIError("No actual cmd passed")
242
243
244 def apply_raw_config(tgen, input_dict):
245
246 """
247 API to configure raw configuration on device. This can be used for any cli
248 which has not been implemented in JSON.
249
250 Parameters
251 ----------
252 * `tgen`: tgen object
253 * `input_dict`: configuration that needs to be applied
254
255 Usage
256 -----
257 input_dict = {
258 "r2": {
259 "raw_config": [
260 "router bgp",
261 "no bgp update-group-split-horizon"
262 ]
263 }
264 }
265 Returns
266 -------
267 True or errormsg
268 """
269
270 rlist = []
271
272 for router_name in input_dict.keys():
273 config_cmd = input_dict[router_name]["raw_config"]
274
275 if not isinstance(config_cmd, list):
276 config_cmd = [config_cmd]
277
278 frr_cfg_file = "{}/{}/{}".format(tgen.logdir, router_name, FRRCFG_FILE)
279 with open(frr_cfg_file, "w") as cfg:
280 for cmd in config_cmd:
281 cfg.write("{}\n".format(cmd))
282
283 rlist.append(router_name)
284
285 # Load config on all routers
286 return load_config_to_routers(tgen, rlist)
287
288
289 def create_common_configurations(
290 tgen, config_dict, config_type=None, build=False, load_config=True
291 ):
292 """
293 API to create object of class FRRConfig and also create frr_json.conf
294 file. It will create interface and common configurations and save it to
295 frr_json.conf and load to router
296 Parameters
297 ----------
298 * `tgen`: tgen object
299 * `config_dict`: Configuration data saved in a dict of { router: config-list }
300 * `routers` : list of router id to be configured.
301 * `config_type` : Syntactic information while writing configuration. Should
302 be one of the value as mentioned in the config_map below.
303 * `build` : Only for initial setup phase this is set as True
304 Returns
305 -------
306 True or False
307 """
308
309 config_map = OrderedDict(
310 {
311 "general_config": "! FRR General Config\n",
312 "debug_log_config": "! Debug log Config\n",
313 "interface_config": "! Interfaces Config\n",
314 "static_route": "! Static Route Config\n",
315 "prefix_list": "! Prefix List Config\n",
316 "bgp_community_list": "! Community List Config\n",
317 "route_maps": "! Route Maps Config\n",
318 "bgp": "! BGP Config\n",
319 "vrf": "! VRF Config\n",
320 "ospf": "! OSPF Config\n",
321 "ospf6": "! OSPF Config\n",
322 "pim": "! PIM Config\n",
323 }
324 )
325
326 if build:
327 mode = "a"
328 elif not load_config:
329 mode = "a"
330 else:
331 mode = "w"
332
333 routers = config_dict.keys()
334 for router in routers:
335 fname = "{}/{}/{}".format(tgen.logdir, router, FRRCFG_FILE)
336 try:
337 frr_cfg_fd = open(fname, mode)
338 if config_type:
339 frr_cfg_fd.write(config_map[config_type])
340 for line in config_dict[router]:
341 frr_cfg_fd.write("{} \n".format(str(line)))
342 frr_cfg_fd.write("\n")
343
344 except IOError as err:
345 logger.error("Unable to open FRR Config '%s': %s" % (fname, str(err)))
346 return False
347 finally:
348 frr_cfg_fd.close()
349
350 # If configuration applied from build, it will done at last
351 result = True
352 if not build and load_config:
353 result = load_config_to_routers(tgen, routers)
354
355 return result
356
357
358 def create_common_configuration(
359 tgen, router, data, config_type=None, build=False, load_config=True
360 ):
361 """
362 API to create object of class FRRConfig and also create frr_json.conf
363 file. It will create interface and common configurations and save it to
364 frr_json.conf and load to router
365 Parameters
366 ----------
367 * `tgen`: tgen object
368 * `data`: Configuration data saved in a list.
369 * `router` : router id to be configured.
370 * `config_type` : Syntactic information while writing configuration. Should
371 be one of the value as mentioned in the config_map below.
372 * `build` : Only for initial setup phase this is set as True
373 Returns
374 -------
375 True or False
376 """
377 return create_common_configurations(
378 tgen, {router: data}, config_type, build, load_config
379 )
380
381
382 def kill_router_daemons(tgen, router, daemons, save_config=True):
383 """
384 Router's current config would be saved to /etc/frr/ for each daemon
385 and daemon would be killed forcefully using SIGKILL.
386 * `tgen` : topogen object
387 * `router`: Device under test
388 * `daemons`: list of daemons to be killed
389 """
390
391 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
392
393 try:
394 router_list = tgen.routers()
395
396 if save_config:
397 # Saving router config to /etc/frr, which will be loaded to router
398 # when it starts
399 router_list[router].vtysh_cmd("write memory")
400
401 # Kill Daemons
402 result = router_list[router].killDaemons(daemons)
403 if len(result) > 0:
404 assert "Errors found post shutdown - details follow:" == 0, result
405 return result
406
407 except Exception as e:
408 errormsg = traceback.format_exc()
409 logger.error(errormsg)
410 return errormsg
411
412
413 def start_router_daemons(tgen, router, daemons):
414 """
415 Daemons defined by user would be started
416 * `tgen` : topogen object
417 * `router`: Device under test
418 * `daemons`: list of daemons to be killed
419 """
420
421 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
422
423 try:
424 router_list = tgen.routers()
425
426 # Start daemons
427 res = router_list[router].startDaemons(daemons)
428
429 except Exception as e:
430 errormsg = traceback.format_exc()
431 logger.error(errormsg)
432 res = errormsg
433
434 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
435 return res
436
437
438 def check_router_status(tgen):
439 """
440 Check if all daemons are running for all routers in topology
441 * `tgen` : topogen object
442 """
443
444 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
445
446 try:
447 router_list = tgen.routers()
448 for router, rnode in router_list.items():
449
450 result = rnode.check_router_running()
451 if result != "":
452 daemons = []
453 if "bgpd" in result:
454 daemons.append("bgpd")
455 if "zebra" in result:
456 daemons.append("zebra")
457 if "pimd" in result:
458 daemons.append("pimd")
459 if "pim6d" in result:
460 daemons.append("pim6d")
461 if "ospfd" in result:
462 daemons.append("ospfd")
463 if "ospf6d" in result:
464 daemons.append("ospf6d")
465 rnode.startDaemons(daemons)
466
467 except Exception as e:
468 errormsg = traceback.format_exc()
469 logger.error(errormsg)
470 return errormsg
471
472 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
473 return True
474
475
476 def save_initial_config_on_routers(tgen):
477 """Save current configuration on routers to FRRCFG_BKUP_FILE.
478
479 FRRCFG_BKUP_FILE is the file that will be restored when `reset_config_on_routers()`
480 is called.
481
482 Parameters
483 ----------
484 * `tgen` : Topogen object
485 """
486 router_list = tgen.routers()
487 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
488
489 # Get all running configs in parallel
490 procs = {}
491 for rname in router_list:
492 logger.info("Fetching running config for router %s", rname)
493 procs[rname] = router_list[rname].popen(
494 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
495 stdin=None,
496 stdout=open(target_cfg_fmt.format(rname), "w"),
497 stderr=subprocess.PIPE,
498 )
499 for rname, p in procs.items():
500 _, error = p.communicate()
501 if p.returncode:
502 logger.error(
503 "Get running config for %s failed %d: %s", rname, p.returncode, error
504 )
505 raise InvalidCLIError(
506 "vtysh show running error on {}: {}".format(rname, error)
507 )
508
509
510 def reset_config_on_routers(tgen, routerName=None):
511 """
512 Resets configuration on routers to the snapshot created using input JSON
513 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
514
515 Parameters
516 ----------
517 * `tgen` : Topogen object
518 * `routerName` : router config is to be reset
519 """
520
521 logger.debug("Entering API: reset_config_on_routers")
522
523 tgen.cfg_gen += 1
524 gen = tgen.cfg_gen
525
526 # Trim the router list if needed
527 router_list = tgen.routers()
528 if routerName:
529 if routerName not in router_list:
530 logger.warning(
531 "Exiting API: reset_config_on_routers: no router %s",
532 routerName,
533 exc_info=True,
534 )
535 return True
536 router_list = {routerName: router_list[routerName]}
537
538 delta_fmt = tgen.logdir + "/{}/delta-{}.conf"
539 # FRRCFG_BKUP_FILE
540 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
541 run_cfg_fmt = tgen.logdir + "/{}/frr-{}.sav"
542
543 #
544 # Get all running configs in parallel
545 #
546 procs = {}
547 for rname in router_list:
548 logger.info("Fetching running config for router %s", rname)
549 procs[rname] = router_list[rname].popen(
550 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
551 stdin=None,
552 stdout=open(run_cfg_fmt.format(rname, gen), "w"),
553 stderr=subprocess.PIPE,
554 )
555 for rname, p in procs.items():
556 _, error = p.communicate()
557 if p.returncode:
558 logger.error(
559 "Get running config for %s failed %d: %s", rname, p.returncode, error
560 )
561 raise InvalidCLIError(
562 "vtysh show running error on {}: {}".format(rname, error)
563 )
564
565 #
566 # Get all delta's in parallel
567 #
568 procs = {}
569 for rname in router_list:
570 logger.info(
571 "Generating delta for router %s to new configuration (gen %d)", rname, gen
572 )
573 procs[rname] = tgen.net.popen(
574 [
575 "/usr/lib/frr/frr-reload.py",
576 "--test-reset",
577 "--input",
578 run_cfg_fmt.format(rname, gen),
579 "--test",
580 target_cfg_fmt.format(rname),
581 ],
582 stdin=None,
583 stdout=open(delta_fmt.format(rname, gen), "w"),
584 stderr=subprocess.PIPE,
585 )
586 for rname, p in procs.items():
587 _, error = p.communicate()
588 if p.returncode:
589 logger.error(
590 "Delta file creation for %s failed %d: %s", rname, p.returncode, error
591 )
592 raise InvalidCLIError("frr-reload error for {}: {}".format(rname, error))
593
594 #
595 # Apply all the deltas in parallel
596 #
597 procs = {}
598 for rname in router_list:
599 logger.info("Applying delta config on router %s", rname)
600
601 procs[rname] = router_list[rname].popen(
602 ["/usr/bin/env", "vtysh", "-f", delta_fmt.format(rname, gen)],
603 stdin=None,
604 stdout=subprocess.PIPE,
605 stderr=subprocess.STDOUT,
606 )
607 for rname, p in procs.items():
608 output, _ = p.communicate()
609 vtysh_command = "vtysh -f {}".format(delta_fmt.format(rname, gen))
610 if not p.returncode:
611 router_list[rname].logger.info(
612 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
613 vtysh_command, output
614 )
615 )
616 else:
617 router_list[rname].logger.warning(
618 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
619 vtysh_command, output
620 )
621 )
622 logger.error(
623 "Delta file apply for %s failed %d: %s", rname, p.returncode, output
624 )
625
626 # We really need to enable this failure; however, currently frr-reload.py
627 # producing invalid "no" commands as it just preprends "no", but some of the
628 # command forms lack matching values (e.g., final values). Until frr-reload
629 # is fixed to handle this (or all the CLI no forms are adjusted) we can't
630 # fail tests.
631 # raise InvalidCLIError("frr-reload error for {}: {}".format(rname, output))
632
633 #
634 # Optionally log all new running config if "show_router_config" is defined in
635 # "pytest.ini"
636 #
637 if show_router_config:
638 procs = {}
639 for rname in router_list:
640 logger.info("Fetching running config for router %s", rname)
641 procs[rname] = router_list[rname].popen(
642 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
643 stdin=None,
644 stdout=subprocess.PIPE,
645 stderr=subprocess.STDOUT,
646 )
647 for rname, p in procs.items():
648 output, _ = p.communicate()
649 if p.returncode:
650 logger.warning(
651 "Get running config for %s failed %d: %s",
652 rname,
653 p.returncode,
654 output,
655 )
656 else:
657 logger.info(
658 "Configuration on router %s after reset:\n%s", rname, output
659 )
660
661 logger.debug("Exiting API: reset_config_on_routers")
662 return True
663
664
665 def prep_load_config_to_routers(tgen, *config_name_list):
666 """Create common config for `load_config_to_routers`.
667
668 The common config file is constructed from the list of sub-config files passed as
669 position arguments to this function. Each entry in `config_name_list` is looked for
670 under the router sub-directory in the test directory and those files are
671 concatenated together to create the common config. e.g.,
672
673 # Routers are "r1" and "r2", test file is `example/test_example_foo.py`
674 prepare_load_config_to_routers(tgen, "bgpd.conf", "ospfd.conf")
675
676 When the above call is made the files in
677
678 example/r1/bgpd.conf
679 example/r1/ospfd.conf
680
681 Are concat'd together into a single config file that will be loaded on r1, and
682
683 example/r2/bgpd.conf
684 example/r2/ospfd.conf
685
686 Are concat'd together into a single config file that will be loaded on r2 when
687 the call to `load_config_to_routers` is made.
688 """
689
690 routers = tgen.routers()
691 for rname, router in routers.items():
692 destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_FILE)
693 wmode = "w"
694 for cfbase in config_name_list:
695 script_dir = os.environ["PYTEST_TOPOTEST_SCRIPTDIR"]
696 confname = os.path.join(script_dir, "{}/{}".format(rname, cfbase))
697 with open(confname, "r") as cf:
698 with open(destname, wmode) as df:
699 df.write(cf.read())
700 wmode = "a"
701
702
703 def load_config_to_routers(tgen, routers, save_bkup=False):
704 """
705 Loads configuration on routers from the file FRRCFG_FILE.
706
707 Parameters
708 ----------
709 * `tgen` : Topogen object
710 * `routers` : routers for which configuration is to be loaded
711 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
712 Returns
713 -------
714 True or False
715 """
716
717 logger.debug("Entering API: load_config_to_routers")
718
719 tgen.cfg_gen += 1
720 gen = tgen.cfg_gen
721
722 base_router_list = tgen.routers()
723 router_list = {}
724 for router in routers:
725 if router not in base_router_list:
726 continue
727 router_list[router] = base_router_list[router]
728
729 frr_cfg_file_fmt = tgen.logdir + "/{}/" + FRRCFG_FILE
730 frr_cfg_save_file_fmt = tgen.logdir + "/{}/{}-" + FRRCFG_FILE
731 frr_cfg_bkup_fmt = tgen.logdir + "/{}/" + FRRCFG_BKUP_FILE
732
733 procs = {}
734 for rname in router_list:
735 router = router_list[rname]
736 try:
737 frr_cfg_file = frr_cfg_file_fmt.format(rname)
738 frr_cfg_save_file = frr_cfg_save_file_fmt.format(rname, gen)
739 frr_cfg_bkup = frr_cfg_bkup_fmt.format(rname)
740 with open(frr_cfg_file, "r+") as cfg:
741 data = cfg.read()
742 logger.info(
743 "Applying following configuration on router %s (gen: %d):\n%s",
744 rname,
745 gen,
746 data,
747 )
748 # Always save a copy of what we just did
749 with open(frr_cfg_save_file, "w") as bkup:
750 bkup.write(data)
751 if save_bkup:
752 with open(frr_cfg_bkup, "w") as bkup:
753 bkup.write(data)
754 procs[rname] = router_list[rname].popen(
755 ["/usr/bin/env", "vtysh", "-f", frr_cfg_file],
756 stdin=None,
757 stdout=subprocess.PIPE,
758 stderr=subprocess.STDOUT,
759 )
760 except IOError as err:
761 logger.error(
762 "Unable to open config File. error(%s): %s", err.errno, err.strerror
763 )
764 return False
765 except Exception as error:
766 logger.error("Unable to apply config on %s: %s", rname, str(error))
767 return False
768
769 errors = []
770 for rname, p in procs.items():
771 output, _ = p.communicate()
772 frr_cfg_file = frr_cfg_file_fmt.format(rname)
773 vtysh_command = "vtysh -f " + frr_cfg_file
774 if not p.returncode:
775 router_list[rname].logger.info(
776 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
777 vtysh_command, output
778 )
779 )
780 else:
781 router_list[rname].logger.error(
782 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
783 vtysh_command, output
784 )
785 )
786 logger.error(
787 "Config apply for %s failed %d: %s", rname, p.returncode, output
788 )
789 # We can't thorw an exception here as we won't clear the config file.
790 errors.append(
791 InvalidCLIError(
792 "load_config_to_routers error for {}: {}".format(rname, output)
793 )
794 )
795
796 # Empty the config file or we append to it next time through.
797 with open(frr_cfg_file, "r+") as cfg:
798 cfg.truncate(0)
799
800 # Router current configuration to log file or console if
801 # "show_router_config" is defined in "pytest.ini"
802 if show_router_config:
803 procs = {}
804 for rname in router_list:
805 procs[rname] = router_list[rname].popen(
806 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
807 stdin=None,
808 stdout=subprocess.PIPE,
809 stderr=subprocess.STDOUT,
810 )
811 for rname, p in procs.items():
812 output, _ = p.communicate()
813 if p.returncode:
814 logger.warning(
815 "Get running config for %s failed %d: %s",
816 rname,
817 p.returncode,
818 output,
819 )
820 else:
821 logger.info("New configuration for router %s:\n%s", rname, output)
822
823 logger.debug("Exiting API: load_config_to_routers")
824 return not errors
825
826
827 def load_config_to_router(tgen, routerName, save_bkup=False):
828 """
829 Loads configuration on router from the file FRRCFG_FILE.
830
831 Parameters
832 ----------
833 * `tgen` : Topogen object
834 * `routerName` : router for which configuration to be loaded
835 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
836 """
837 return load_config_to_routers(tgen, [routerName], save_bkup)
838
839
840 def reset_with_new_configs(tgen, *cflist):
841 """Reset the router to initial config, then load new configs.
842
843 Resets routers to the initial config state (see `save_initial_config_on_routers()
844 and `reset_config_on_routers()` `), then concat list of router sub-configs together
845 and load onto the routers (see `prep_load_config_to_routers()` and
846 `load_config_to_routers()`)
847 """
848 routers = tgen.routers()
849
850 reset_config_on_routers(tgen)
851 prep_load_config_to_routers(tgen, *cflist)
852 load_config_to_routers(tgen, tgen.routers(), save_bkup=False)
853
854
855 def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
856 """
857 API to get the link local ipv6 address of a particular interface using
858 FRR command 'show interface'
859
860 * `tgen`: tgen object
861 * `router` : router for which highest interface should be
862 calculated
863 * `intf` : interface for which link-local address needs to be taken
864 * `vrf` : VRF name
865
866 Usage
867 -----
868 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
869
870 Returns
871 -------
872 1) array of interface names to link local ips.
873 """
874
875 router_list = tgen.routers()
876 for rname, rnode in router_list.items():
877 if rname != router:
878 continue
879
880 linklocal = []
881
882 if vrf:
883 cmd = "show interface vrf {}".format(vrf)
884 else:
885 cmd = "show interface"
886
887 linklocal = []
888 if vrf:
889 cmd = "show interface vrf {}".format(vrf)
890 else:
891 cmd = "show interface"
892 for chk_ll in range(0, 60):
893 sleep(1 / 4)
894 ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
895 # Fix newlines (make them all the same)
896 ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines()
897
898 interface = None
899 ll_per_if_count = 0
900 for line in ifaces:
901 # Interface name
902 m = re_search("Interface ([a-zA-Z0-9-]+) is", line)
903 if m:
904 interface = m.group(1).split(" ")[0]
905 ll_per_if_count = 0
906
907 # Interface ip
908 m1 = re_search("inet6 (fe80[:a-fA-F0-9]+/[0-9]+)", line)
909 if m1:
910 local = m1.group(1)
911 ll_per_if_count += 1
912 if ll_per_if_count > 1:
913 linklocal += [["%s-%s" % (interface, ll_per_if_count), local]]
914 else:
915 linklocal += [[interface, local]]
916
917 try:
918 if linklocal:
919 if intf:
920 return [
921 _linklocal[1]
922 for _linklocal in linklocal
923 if _linklocal[0] == intf
924 ][0].split("/")[0]
925 return linklocal
926 except IndexError:
927 continue
928
929 errormsg = "Link local ip missing on router {}".format(router)
930 return errormsg
931
932
933 def generate_support_bundle():
934 """
935 API to generate support bundle on any verification ste failure.
936 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
937 which basically runs defined CLIs and dumps the data to specified location
938 """
939
940 tgen = get_topogen()
941 router_list = tgen.routers()
942 test_name = os.environ.get("PYTEST_CURRENT_TEST").split(":")[-1].split(" ")[0]
943
944 bundle_procs = {}
945 for rname, rnode in router_list.items():
946 logger.info("Spawn collection of support bundle for %s", rname)
947 dst_bundle = "{}/{}/support_bundles/{}".format(tgen.logdir, rname, test_name)
948 rnode.run("mkdir -p " + dst_bundle)
949
950 gen_sup_cmd = [
951 "/usr/lib/frr/generate_support_bundle.py",
952 "--log-dir=" + dst_bundle,
953 ]
954 bundle_procs[rname] = tgen.net[rname].popen(gen_sup_cmd, stdin=None)
955
956 for rname, rnode in router_list.items():
957 logger.info("Waiting on support bundle for %s", rname)
958 output, error = bundle_procs[rname].communicate()
959 if output:
960 logger.info(
961 "Output from collecting support bundle for %s:\n%s", rname, output
962 )
963 if error:
964 logger.warning(
965 "Error from collecting support bundle for %s:\n%s", rname, error
966 )
967
968 return True
969
970
971 def start_topology(tgen):
972 """
973 Starting topology, create tmp files which are loaded to routers
974 to start daemons and then start routers
975 * `tgen` : topogen object
976 """
977
978 # Starting topology
979 tgen.start_topology()
980
981 # Starting daemons
982
983 router_list = tgen.routers()
984 routers_sorted = sorted(
985 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
986 )
987
988 linux_ver = ""
989 router_list = tgen.routers()
990 for rname in routers_sorted:
991 router = router_list[rname]
992
993 # It will help in debugging the failures, will give more details on which
994 # specific kernel version tests are failing
995 if linux_ver == "":
996 linux_ver = router.run("uname -a")
997 logger.info("Logging platform related details: \n %s \n", linux_ver)
998
999 try:
1000 os.chdir(tgen.logdir)
1001
1002 # # Creating router named dir and empty zebra.conf bgpd.conf files
1003 # # inside the current directory
1004 # if os.path.isdir("{}".format(rname)):
1005 # os.system("rm -rf {}".format(rname))
1006 # os.mkdir("{}".format(rname))
1007 # os.system("chmod -R go+rw {}".format(rname))
1008 # os.chdir("{}/{}".format(tgen.logdir, rname))
1009 # os.system("touch zebra.conf bgpd.conf")
1010 # else:
1011 # os.mkdir("{}".format(rname))
1012 # os.system("chmod -R go+rw {}".format(rname))
1013 # os.chdir("{}/{}".format(tgen.logdir, rname))
1014 # os.system("touch zebra.conf bgpd.conf")
1015
1016 except IOError as err:
1017 logger.error("I/O error({0}): {1}".format(err.errno, err.strerror))
1018
1019 topo = tgen.json_topo
1020 feature = set()
1021
1022 if "feature" in topo:
1023 feature.update(topo["feature"])
1024
1025 if rname in topo["routers"]:
1026 for key in topo["routers"][rname].keys():
1027 feature.add(key)
1028
1029 for val in topo["routers"][rname]["links"].values():
1030 if "pim" in val:
1031 feature.add("pim")
1032 break
1033 for val in topo["routers"][rname]["links"].values():
1034 if "pim6" in val:
1035 feature.add("pim6")
1036 break
1037 for val in topo["routers"][rname]["links"].values():
1038 if "ospf6" in val:
1039 feature.add("ospf6")
1040 break
1041 if "switches" in topo and rname in topo["switches"]:
1042 for val in topo["switches"][rname]["links"].values():
1043 if "ospf" in val:
1044 feature.add("ospf")
1045 break
1046 if "ospf6" in val:
1047 feature.add("ospf6")
1048 break
1049
1050 # Loading empty zebra.conf file to router, to start the zebra deamon
1051 router.load_config(
1052 TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(tgen.logdir, rname)
1053 )
1054
1055 # Loading empty bgpd.conf file to router, to start the bgp deamon
1056 if "bgp" in feature:
1057 router.load_config(
1058 TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(tgen.logdir, rname)
1059 )
1060
1061 # Loading empty pimd.conf file to router, to start the pim deamon
1062 if "pim" in feature:
1063 router.load_config(
1064 TopoRouter.RD_PIM, "{}/{}/pimd.conf".format(tgen.logdir, rname)
1065 )
1066
1067 # Loading empty pimd.conf file to router, to start the pim deamon
1068 if "pim6" in feature:
1069 router.load_config(
1070 TopoRouter.RD_PIM6, "{}/{}/pim6d.conf".format(tgen.logdir, rname)
1071 )
1072
1073 if "ospf" in feature:
1074 # Loading empty ospf.conf file to router, to start the ospf deamon
1075 router.load_config(
1076 TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(tgen.logdir, rname)
1077 )
1078
1079 if "ospf6" in feature:
1080 # Loading empty ospf.conf file to router, to start the ospf deamon
1081 router.load_config(
1082 TopoRouter.RD_OSPF6, "{}/{}/ospf6d.conf".format(tgen.logdir, rname)
1083 )
1084
1085 # Starting routers
1086 logger.info("Starting all routers once topology is created")
1087 tgen.start_router()
1088
1089
1090 def stop_router(tgen, router):
1091 """
1092 Router"s current config would be saved to /tmp/topotest/<suite>/<router> for each daemon
1093 and router and its daemons would be stopped.
1094
1095 * `tgen` : topogen object
1096 * `router`: Device under test
1097 """
1098
1099 router_list = tgen.routers()
1100
1101 # Saving router config to /etc/frr, which will be loaded to router
1102 # when it starts
1103 router_list[router].vtysh_cmd("write memory")
1104
1105 # Stop router
1106 router_list[router].stop()
1107
1108
1109 def start_router(tgen, router):
1110 """
1111 Router will be started and config would be loaded from /tmp/topotest/<suite>/<router> for each
1112 daemon
1113
1114 * `tgen` : topogen object
1115 * `router`: Device under test
1116 """
1117
1118 logger.debug("Entering lib API: start_router")
1119
1120 try:
1121 router_list = tgen.routers()
1122
1123 # Router and its daemons would be started and config would
1124 # be loaded to router for each daemon from /etc/frr
1125 router_list[router].start()
1126
1127 # Waiting for router to come up
1128 sleep(5)
1129
1130 except Exception as e:
1131 errormsg = traceback.format_exc()
1132 logger.error(errormsg)
1133 return errormsg
1134
1135 logger.debug("Exiting lib API: start_router()")
1136 return True
1137
1138
1139 def number_to_row(routerName):
1140 """
1141 Returns the number for the router.
1142 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
1143 etc
1144 """
1145 return int(routerName[1:])
1146
1147
1148 def number_to_column(routerName):
1149 """
1150 Returns the number for the router.
1151 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
1152 z23 = column 26 etc
1153 """
1154 return ord(routerName[0]) - 97
1155
1156
1157 def topo_daemons(tgen, topo=None):
1158 """
1159 Returns daemon list required for the suite based on topojson.
1160 """
1161 daemon_list = []
1162
1163 if topo is None:
1164 topo = tgen.json_topo
1165
1166 router_list = tgen.routers()
1167 routers_sorted = sorted(
1168 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
1169 )
1170
1171 for rtr in routers_sorted:
1172 if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list:
1173 daemon_list.append("ospfd")
1174
1175 if "ospf6" in topo["routers"][rtr] and "ospf6d" not in daemon_list:
1176 daemon_list.append("ospf6d")
1177
1178 for val in topo["routers"][rtr]["links"].values():
1179 if "pim" in val and "pimd" not in daemon_list:
1180 daemon_list.append("pimd")
1181 if "pim6" in val and "pim6d" not in daemon_list:
1182 daemon_list.append("pim6d")
1183 if "ospf" in val and "ospfd" not in daemon_list:
1184 daemon_list.append("ospfd")
1185 if "ospf6" in val and "ospf6d" not in daemon_list:
1186 daemon_list.append("ospf6d")
1187 break
1188
1189 return daemon_list
1190
1191
1192 def add_interfaces_to_vlan(tgen, input_dict):
1193 """
1194 Add interfaces to VLAN, we need vlan pakcage to be installed on machine
1195
1196 * `tgen`: tgen onject
1197 * `input_dict` : interfaces to be added to vlans
1198
1199 input_dict= {
1200 "r1":{
1201 "vlan":{
1202 VLAN_1: [{
1203 intf_r1_s1: {
1204 "ip": "10.1.1.1",
1205 "subnet": "255.255.255.0
1206 }
1207 }]
1208 }
1209 }
1210 }
1211
1212 add_interfaces_to_vlan(tgen, input_dict)
1213
1214 """
1215
1216 router_list = tgen.routers()
1217 for dut in input_dict.keys():
1218 rnode = router_list[dut]
1219
1220 if "vlan" in input_dict[dut]:
1221 for vlan, interfaces in input_dict[dut]["vlan"].items():
1222 for intf_dict in interfaces:
1223 for interface, data in intf_dict.items():
1224 # Adding interface to VLAN
1225 vlan_intf = "{}.{}".format(interface, vlan)
1226 cmd = "ip link add link {} name {} type vlan id {}".format(
1227 interface, vlan_intf, vlan
1228 )
1229 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1230 result = rnode.run(cmd)
1231 logger.info("result %s", result)
1232
1233 # Bringing interface up
1234 cmd = "ip link set {} up".format(vlan_intf)
1235 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1236 result = rnode.run(cmd)
1237 logger.info("result %s", result)
1238
1239 # Assigning IP address
1240 ifaddr = ipaddress.ip_interface(
1241 "{}/{}".format(
1242 frr_unicode(data["ip"]), frr_unicode(data["subnet"])
1243 )
1244 )
1245
1246 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1247 ifaddr.version, vlan_intf, ifaddr
1248 )
1249 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1250 result = rnode.run(cmd)
1251 logger.info("result %s", result)
1252
1253
1254 def tcpdump_capture_start(
1255 tgen,
1256 router,
1257 intf,
1258 protocol=None,
1259 grepstr=None,
1260 timeout=0,
1261 options=None,
1262 cap_file=None,
1263 background=True,
1264 ):
1265 """
1266 API to capture network packets using tcp dump.
1267
1268 Packages used :
1269
1270 Parameters
1271 ----------
1272 * `tgen`: topogen object.
1273 * `router`: router on which ping has to be performed.
1274 * `intf` : interface for capture.
1275 * `protocol` : protocol for which packet needs to be captured.
1276 * `grepstr` : string to filter out tcp dump output.
1277 * `timeout` : Time for which packet needs to be captured.
1278 * `options` : options for TCP dump, all tcpdump options can be used.
1279 * `cap_file` : filename to store capture dump.
1280 * `background` : Make tcp dump run in back ground.
1281
1282 Usage
1283 -----
1284 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1285 options='-A -vv -x > r2bgp.txt ')
1286 Returns
1287 -------
1288 1) True for successful capture
1289 2) errormsg - when tcp dump fails
1290 """
1291
1292 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1293
1294 rnode = tgen.gears[router]
1295
1296 if timeout > 0:
1297 cmd = "timeout {}".format(timeout)
1298 else:
1299 cmd = ""
1300
1301 cmdargs = "{} tcpdump".format(cmd)
1302
1303 if intf:
1304 cmdargs += " -i {}".format(str(intf))
1305 if protocol:
1306 cmdargs += " {}".format(str(protocol))
1307 if options:
1308 cmdargs += " -s 0 {}".format(str(options))
1309
1310 if cap_file:
1311 file_name = os.path.join(tgen.logdir, router, cap_file)
1312 cmdargs += " -w {}".format(str(file_name))
1313 # Remove existing capture file
1314 rnode.run("rm -rf {}".format(file_name))
1315
1316 if grepstr:
1317 cmdargs += ' | grep "{}"'.format(str(grepstr))
1318
1319 logger.info("Running tcpdump command: [%s]", cmdargs)
1320 if not background:
1321 rnode.run(cmdargs)
1322 else:
1323 # XXX this & is bogus doesn't work
1324 # rnode.run("nohup {} & /dev/null 2>&1".format(cmdargs))
1325 rnode.run("nohup {} > /dev/null 2>&1".format(cmdargs))
1326
1327 # Check if tcpdump process is running
1328 if background:
1329 result = rnode.run("pgrep tcpdump")
1330 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1331
1332 if not result:
1333 errormsg = "tcpdump is not running {}".format("tcpdump")
1334 return errormsg
1335 else:
1336 logger.info("Packet capture started on %s: interface %s", router, intf)
1337
1338 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1339 return True
1340
1341
1342 def tcpdump_capture_stop(tgen, router):
1343 """
1344 API to capture network packets using tcp dump.
1345
1346 Packages used :
1347
1348 Parameters
1349 ----------
1350 * `tgen`: topogen object.
1351 * `router`: router on which ping has to be performed.
1352 * `intf` : interface for capture.
1353 * `protocol` : protocol for which packet needs to be captured.
1354 * `grepstr` : string to filter out tcp dump output.
1355 * `timeout` : Time for which packet needs to be captured.
1356 * `options` : options for TCP dump, all tcpdump options can be used.
1357 * `cap2file` : filename to store capture dump.
1358 * `bakgrnd` : Make tcp dump run in back ground.
1359
1360 Usage
1361 -----
1362 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1363 options='-A -vv -x > r2bgp.txt ')
1364 Returns
1365 -------
1366 1) True for successful capture
1367 2) errormsg - when tcp dump fails
1368 """
1369
1370 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1371
1372 rnode = tgen.gears[router]
1373
1374 # Check if tcpdump process is running
1375 result = rnode.run("ps -ef | grep tcpdump")
1376 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1377
1378 if not re_search(r"{}".format("tcpdump"), result):
1379 errormsg = "tcpdump is not running {}".format("tcpdump")
1380 return errormsg
1381 else:
1382 # XXX this doesn't work with micronet
1383 ppid = tgen.net.nameToNode[rnode.name].pid
1384 rnode.run("set +m; pkill -P %s tcpdump &> /dev/null" % ppid)
1385 logger.info("Stopped tcpdump capture")
1386
1387 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1388 return True
1389
1390
1391 def create_debug_log_config(tgen, input_dict, build=False):
1392 """
1393 Enable/disable debug logs for any protocol with defined debug
1394 options and logs would be saved to created log file
1395
1396 Parameters
1397 ----------
1398 * `tgen` : Topogen object
1399 * `input_dict` : details to enable debug logs for protocols
1400 * `build` : Only for initial setup phase this is set as True.
1401
1402
1403 Usage:
1404 ------
1405 input_dict = {
1406 "r2": {
1407 "debug":{
1408 "log_file" : "debug.log",
1409 "enable": ["pimd", "zebra"],
1410 "disable": {
1411 "bgpd":[
1412 'debug bgp neighbor-events',
1413 'debug bgp updates',
1414 'debug bgp zebra',
1415 ]
1416 }
1417 }
1418 }
1419 }
1420
1421 result = create_debug_log_config(tgen, input_dict)
1422
1423 Returns
1424 -------
1425 True or False
1426 """
1427
1428 result = False
1429 try:
1430 debug_config_dict = {}
1431
1432 for router in input_dict.keys():
1433 debug_config = []
1434 if "debug" in input_dict[router]:
1435 debug_dict = input_dict[router]["debug"]
1436
1437 disable_logs = debug_dict.setdefault("disable", None)
1438 enable_logs = debug_dict.setdefault("enable", None)
1439 log_file = debug_dict.setdefault("log_file", None)
1440
1441 if log_file:
1442 _log_file = os.path.join(tgen.logdir, log_file)
1443 debug_config.append("log file {} \n".format(_log_file))
1444
1445 if type(enable_logs) is list:
1446 for daemon in enable_logs:
1447 for debug_log in DEBUG_LOGS[daemon]:
1448 debug_config.append("{}".format(debug_log))
1449 elif type(enable_logs) is dict:
1450 for daemon, debug_logs in enable_logs.items():
1451 for debug_log in debug_logs:
1452 debug_config.append("{}".format(debug_log))
1453
1454 if type(disable_logs) is list:
1455 for daemon in disable_logs:
1456 for debug_log in DEBUG_LOGS[daemon]:
1457 debug_config.append("no {}".format(debug_log))
1458 elif type(disable_logs) is dict:
1459 for daemon, debug_logs in disable_logs.items():
1460 for debug_log in debug_logs:
1461 debug_config.append("no {}".format(debug_log))
1462 if debug_config:
1463 debug_config_dict[router] = debug_config
1464
1465 result = create_common_configurations(
1466 tgen, debug_config_dict, "debug_log_config", build=build
1467 )
1468 except InvalidCLIError:
1469 # Traceback
1470 errormsg = traceback.format_exc()
1471 logger.error(errormsg)
1472 return errormsg
1473
1474 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1475 return result
1476
1477
1478 #############################################
1479 # Common APIs, will be used by all protocols
1480 #############################################
1481
1482
1483 def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
1484 """
1485 Create vrf configuration for created topology. VRF
1486 configuration is provided in input json file.
1487
1488 VRF config is done in Linux Kernel:
1489 * Create VRF
1490 * Attach interface to VRF
1491 * Bring up VRF
1492
1493 Parameters
1494 ----------
1495 * `tgen` : Topogen object
1496 * `topo` : json file data
1497 * `input_dict` : Input dict data, required when configuring
1498 from testcase
1499 * `build` : Only for initial setup phase this is set as True.
1500
1501 Usage
1502 -----
1503 input_dict={
1504 "r3": {
1505 "links": {
1506 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
1507 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
1508 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
1509 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
1510 },
1511 "vrfs":[
1512 {
1513 "name": "RED_A",
1514 "id": "1"
1515 },
1516 {
1517 "name": "RED_B",
1518 "id": "2"
1519 },
1520 {
1521 "name": "BLUE_A",
1522 "id": "3",
1523 "delete": True
1524 },
1525 {
1526 "name": "BLUE_B",
1527 "id": "4"
1528 }
1529 ]
1530 }
1531 }
1532 result = create_vrf_cfg(tgen, topo, input_dict)
1533
1534 Returns
1535 -------
1536 True or False
1537 """
1538 result = True
1539 if not input_dict:
1540 input_dict = deepcopy(topo)
1541 else:
1542 input_dict = deepcopy(input_dict)
1543
1544 try:
1545 config_data_dict = {}
1546
1547 for c_router, c_data in input_dict.items():
1548 rnode = tgen.gears[c_router]
1549 config_data = []
1550 if "vrfs" in c_data:
1551 for vrf in c_data["vrfs"]:
1552 name = vrf.setdefault("name", None)
1553 table_id = vrf.setdefault("id", None)
1554 del_action = vrf.setdefault("delete", False)
1555
1556 if del_action:
1557 # Kernel cmd- Add VRF and table
1558 cmd = "ip link del {} type vrf table {}".format(
1559 vrf["name"], vrf["id"]
1560 )
1561
1562 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
1563 rnode.run(cmd)
1564
1565 # Kernel cmd - Bring down VRF
1566 cmd = "ip link set dev {} down".format(name)
1567 logger.info("[DUT: %s]: Running kernel cmd [%s]", c_router, cmd)
1568 rnode.run(cmd)
1569
1570 else:
1571 if name and table_id:
1572 # Kernel cmd- Add VRF and table
1573 cmd = "ip link add {} type vrf table {}".format(
1574 name, table_id
1575 )
1576 logger.info(
1577 "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
1578 )
1579 rnode.run(cmd)
1580
1581 # Kernel cmd - Bring up VRF
1582 cmd = "ip link set dev {} up".format(name)
1583 logger.info(
1584 "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
1585 )
1586 rnode.run(cmd)
1587
1588 for vrf in c_data["vrfs"]:
1589 vni = vrf.setdefault("vni", None)
1590 del_vni = vrf.setdefault("no_vni", None)
1591
1592 if "links" in c_data:
1593 for destRouterLink, data in sorted(c_data["links"].items()):
1594 # Loopback interfaces
1595 if "type" in data and data["type"] == "loopback":
1596 interface_name = destRouterLink
1597 else:
1598 interface_name = data["interface"]
1599
1600 if "vrf" in data:
1601 vrf_list = data["vrf"]
1602
1603 if type(vrf_list) is not list:
1604 vrf_list = [vrf_list]
1605
1606 for _vrf in vrf_list:
1607 cmd = "ip link set {} master {}".format(
1608 interface_name, _vrf
1609 )
1610
1611 logger.info(
1612 "[DUT: %s]: Running" " kernel cmd [%s]",
1613 c_router,
1614 cmd,
1615 )
1616 rnode.run(cmd)
1617
1618 if vni:
1619 config_data.append("vrf {}".format(vrf["name"]))
1620 cmd = "vni {}".format(vni)
1621 config_data.append(cmd)
1622
1623 if del_vni:
1624 config_data.append("vrf {}".format(vrf["name"]))
1625 cmd = "no vni {}".format(del_vni)
1626 config_data.append(cmd)
1627
1628 if config_data:
1629 config_data_dict[c_router] = config_data
1630
1631 result = create_common_configurations(
1632 tgen, config_data_dict, "vrf", build=build
1633 )
1634
1635 except InvalidCLIError:
1636 # Traceback
1637 errormsg = traceback.format_exc()
1638 logger.error(errormsg)
1639 return errormsg
1640
1641 return result
1642
1643
1644 def create_interface_in_kernel(
1645 tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
1646 ):
1647 """
1648 Cretae interfaces in kernel for ipv4/ipv6
1649 Config is done in Linux Kernel:
1650
1651 Parameters
1652 ----------
1653 * `tgen` : Topogen object
1654 * `dut` : Device for which interfaces to be added
1655 * `name` : interface name
1656 * `ip_addr` : ip address for interface
1657 * `vrf` : VRF name, to which interface will be associated
1658 * `netmask` : netmask value, default is None
1659 * `create`: Create interface in kernel, if created then no need
1660 to create
1661 """
1662
1663 rnode = tgen.gears[dut]
1664
1665 if create:
1666 cmd = "ip link show {0} >/dev/null || ip link add {0} type dummy".format(name)
1667 rnode.run(cmd)
1668
1669 if not netmask:
1670 ifaddr = ipaddress.ip_interface(frr_unicode(ip_addr))
1671 else:
1672 ifaddr = ipaddress.ip_interface(
1673 "{}/{}".format(frr_unicode(ip_addr), frr_unicode(netmask))
1674 )
1675 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1676 ifaddr.version, name, ifaddr
1677 )
1678 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1679 rnode.run(cmd)
1680
1681 if vrf:
1682 cmd = "ip link set {} master {}".format(name, vrf)
1683 rnode.run(cmd)
1684
1685
1686 def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False):
1687 """
1688 Cretae interfaces in kernel for ipv4/ipv6
1689 Config is done in Linux Kernel:
1690
1691 Parameters
1692 ----------
1693 * `tgen` : Topogen object
1694 * `dut` : Device for which interfaces to be added
1695 * `intf_name` : interface name
1696 * `ifaceaction` : False to shutdown and True to bringup the
1697 ineterface
1698 """
1699
1700 rnode = tgen.gears[dut]
1701
1702 cmd = "ip link set dev"
1703 if ifaceaction:
1704 action = "up"
1705 cmd = "{} {} {}".format(cmd, intf_name, action)
1706 else:
1707 action = "down"
1708 cmd = "{} {} {}".format(cmd, intf_name, action)
1709
1710 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
1711 rnode.run(cmd)
1712
1713
1714 def validate_ip_address(ip_address):
1715 """
1716 Validates the type of ip address
1717 Parameters
1718 ----------
1719 * `ip_address`: IPv4/IPv6 address
1720 Returns
1721 -------
1722 Type of address as string
1723 """
1724
1725 if "/" in ip_address:
1726 ip_address = ip_address.split("/")[0]
1727
1728 v4 = True
1729 v6 = True
1730 try:
1731 socket.inet_aton(ip_address)
1732 except socket.error as error:
1733 logger.debug("Not a valid IPv4 address")
1734 v4 = False
1735 else:
1736 return "ipv4"
1737
1738 try:
1739 socket.inet_pton(socket.AF_INET6, ip_address)
1740 except socket.error as error:
1741 logger.debug("Not a valid IPv6 address")
1742 v6 = False
1743 else:
1744 return "ipv6"
1745
1746 if not v4 and not v6:
1747 raise Exception(
1748 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1749 )
1750
1751
1752 def check_address_types(addr_type=None):
1753 """
1754 Checks environment variable set and compares with the current address type
1755 """
1756
1757 addr_types_env = os.environ.get("ADDRESS_TYPES")
1758 if not addr_types_env:
1759 addr_types_env = "dual"
1760
1761 if addr_types_env == "dual":
1762 addr_types = ["ipv4", "ipv6"]
1763 elif addr_types_env == "ipv4":
1764 addr_types = ["ipv4"]
1765 elif addr_types_env == "ipv6":
1766 addr_types = ["ipv6"]
1767
1768 if addr_type is None:
1769 return addr_types
1770
1771 if addr_type not in addr_types:
1772 logger.debug(
1773 "{} not in supported/configured address types {}".format(
1774 addr_type, addr_types
1775 )
1776 )
1777 return False
1778
1779 return True
1780
1781
1782 def generate_ips(network, no_of_ips):
1783 """
1784 Returns list of IPs.
1785 based on start_ip and no_of_ips
1786
1787 * `network` : from here the ip will start generating,
1788 start_ip will be
1789 * `no_of_ips` : these many IPs will be generated
1790 """
1791 ipaddress_list = []
1792 if type(network) is not list:
1793 network = [network]
1794
1795 for start_ipaddr in network:
1796 if "/" in start_ipaddr:
1797 start_ip = start_ipaddr.split("/")[0]
1798 mask = int(start_ipaddr.split("/")[1])
1799 else:
1800 logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr))
1801 assert 0
1802
1803 addr_type = validate_ip_address(start_ip)
1804 if addr_type == "ipv4":
1805 if start_ip == "0.0.0.0" and mask == 0 and no_of_ips == 1:
1806 ipaddress_list.append("{}/{}".format(start_ip, mask))
1807 return ipaddress_list
1808 start_ip = ipaddress.IPv4Address(frr_unicode(start_ip))
1809 step = 2 ** (32 - mask)
1810 elif addr_type == "ipv6":
1811 if start_ip == "0::0" and mask == 0 and no_of_ips == 1:
1812 ipaddress_list.append("{}/{}".format(start_ip, mask))
1813 return ipaddress_list
1814 start_ip = ipaddress.IPv6Address(frr_unicode(start_ip))
1815 step = 2 ** (128 - mask)
1816 else:
1817 return []
1818
1819 next_ip = start_ip
1820 count = 0
1821 while count < no_of_ips:
1822 ipaddress_list.append("{}/{}".format(next_ip, mask))
1823 if addr_type == "ipv6":
1824 next_ip = ipaddress.IPv6Address(int(next_ip) + step)
1825 else:
1826 next_ip += step
1827 count += 1
1828
1829 return ipaddress_list
1830
1831
1832 def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
1833 """
1834 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1835 it will return highest IP from loopback IPs otherwise from physical
1836 interface IPs.
1837 * `topo` : json file data
1838 * `router` : router for which highest interface should be calculated
1839 """
1840
1841 link_data = topo["routers"][router]["links"]
1842 lo_list = []
1843 interfaces_list = []
1844 lo_exists = False
1845 for destRouterLink, data in sorted(link_data.items()):
1846 if loopback:
1847 if "type" in data and data["type"] == "loopback":
1848 lo_exists = True
1849 ip_address = topo["routers"][router]["links"][destRouterLink][
1850 "ipv4"
1851 ].split("/")[0]
1852 lo_list.append(ip_address)
1853 if interface:
1854 ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split(
1855 "/"
1856 )[0]
1857 interfaces_list.append(ip_address)
1858
1859 if lo_exists:
1860 return sorted(lo_list)[-1]
1861
1862 return sorted(interfaces_list)[-1]
1863
1864
1865 def write_test_header(tc_name):
1866 """Display message at beginning of test case"""
1867 count = 20
1868 logger.info("*" * (len(tc_name) + count))
1869 step("START -> Testcase : %s" % tc_name, reset=True)
1870 logger.info("*" * (len(tc_name) + count))
1871
1872
1873 def write_test_footer(tc_name):
1874 """Display message at end of test case"""
1875 count = 21
1876 logger.info("=" * (len(tc_name) + count))
1877 logger.info("Testcase : %s -> PASSED", tc_name)
1878 logger.info("=" * (len(tc_name) + count))
1879
1880
1881 def interface_status(tgen, topo, input_dict):
1882 """
1883 Delete ip route maps from device
1884 * `tgen` : Topogen object
1885 * `topo` : json file data
1886 * `input_dict` : for which router, route map has to be deleted
1887 Usage
1888 -----
1889 input_dict = {
1890 "r3": {
1891 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1892 "status": "down"
1893 }
1894 }
1895 Returns
1896 -------
1897 errormsg(str) or True
1898 """
1899 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1900
1901 try:
1902 rlist = []
1903
1904 for router in input_dict.keys():
1905
1906 interface_list = input_dict[router]["interface_list"]
1907 status = input_dict[router].setdefault("status", "up")
1908 for intf in interface_list:
1909 rnode = tgen.gears[router]
1910 interface_set_status(rnode, intf, status)
1911
1912 rlist.append(router)
1913
1914 # Load config to routers
1915 load_config_to_routers(tgen, rlist)
1916
1917 except Exception as e:
1918 errormsg = traceback.format_exc()
1919 logger.error(errormsg)
1920 return errormsg
1921
1922 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1923 return True
1924
1925
1926 def retry(retry_timeout, initial_wait=0, expected=True, diag_pct=0.75):
1927 """
1928 Fixture: Retries function while it's return value is an errormsg (str), False, or it raises an exception.
1929
1930 * `retry_timeout`: Retry for at least this many seconds; after waiting initial_wait seconds
1931 * `initial_wait`: Sleeps for this many seconds before first executing function
1932 * `expected`: if False then the return logic is inverted, except for exceptions,
1933 (i.e., a False or errmsg (str) function return ends the retry loop,
1934 and returns that False or str value)
1935 * `diag_pct`: Percentage of `retry_timeout` to keep testing after negative result would have
1936 been returned in order to see if a positive result comes after. This is an
1937 important diagnostic tool, and normally should not be disabled. Calls to wrapped
1938 functions though, can override the `diag_pct` value to make it larger in case more
1939 diagnostic retrying is appropriate.
1940 """
1941
1942 def _retry(func):
1943 @wraps(func)
1944 def func_retry(*args, **kwargs):
1945 # We will continue to retry diag_pct of the timeout value to see if test would have passed with a
1946 # longer retry timeout value.
1947 saved_failure = None
1948
1949 retry_sleep = 2
1950
1951 # Allow the wrapped function's args to override the fixtures
1952 _retry_timeout = kwargs.pop("retry_timeout", retry_timeout)
1953 _expected = kwargs.pop("expected", expected)
1954 _initial_wait = kwargs.pop("initial_wait", initial_wait)
1955 _diag_pct = kwargs.pop("diag_pct", diag_pct)
1956
1957 start_time = datetime.now()
1958 retry_until = datetime.now() + timedelta(
1959 seconds=_retry_timeout + _initial_wait
1960 )
1961
1962 if initial_wait > 0:
1963 logger.info("Waiting for [%s]s as initial delay", initial_wait)
1964 sleep(initial_wait)
1965
1966 invert_logic = not _expected
1967 while True:
1968 seconds_left = (retry_until - datetime.now()).total_seconds()
1969 try:
1970 ret = func(*args, **kwargs)
1971 logger.debug("Function returned %s", ret)
1972
1973 negative_result = ret is False or is_string(ret)
1974 if negative_result == invert_logic:
1975 # Simple case, successful result in time
1976 if not saved_failure:
1977 return ret
1978
1979 # Positive result, but happened after timeout failure, very important to
1980 # note for fixing tests.
1981 logger.warning(
1982 "RETRY DIAGNOSTIC: SUCCEED after FAILED with requested timeout of %.1fs; however, succeeded in %.1fs, investigate timeout timing",
1983 _retry_timeout,
1984 (datetime.now() - start_time).total_seconds(),
1985 )
1986 if isinstance(saved_failure, Exception):
1987 raise saved_failure # pylint: disable=E0702
1988 return saved_failure
1989
1990 except Exception as error:
1991 logger.info("Function raised exception: %s", str(error))
1992 ret = error
1993
1994 if seconds_left < 0 and saved_failure:
1995 logger.info(
1996 "RETRY DIAGNOSTIC: Retry timeout reached, still failing"
1997 )
1998 if isinstance(saved_failure, Exception):
1999 raise saved_failure # pylint: disable=E0702
2000 return saved_failure
2001
2002 if seconds_left < 0:
2003 logger.info("Retry timeout of %ds reached", _retry_timeout)
2004
2005 saved_failure = ret
2006 retry_extra_delta = timedelta(
2007 seconds=seconds_left + _retry_timeout * _diag_pct
2008 )
2009 retry_until = datetime.now() + retry_extra_delta
2010 seconds_left = retry_extra_delta.total_seconds()
2011
2012 # Generate bundle after setting remaining diagnostic retry time
2013 generate_support_bundle()
2014
2015 # If user has disabled diagnostic retries return now
2016 if not _diag_pct:
2017 if isinstance(saved_failure, Exception):
2018 raise saved_failure
2019 return saved_failure
2020
2021 if saved_failure:
2022 logger.info(
2023 "RETRY DIAG: [failure] Sleeping %ds until next retry with %.1f retry time left - too see if timeout was too short",
2024 retry_sleep,
2025 seconds_left,
2026 )
2027 else:
2028 logger.info(
2029 "Sleeping %ds until next retry with %.1f retry time left",
2030 retry_sleep,
2031 seconds_left,
2032 )
2033 sleep(retry_sleep)
2034
2035 func_retry._original = func
2036 return func_retry
2037
2038 return _retry
2039
2040
2041 class Stepper:
2042 """
2043 Prints step number for the test case step being executed
2044 """
2045
2046 count = 1
2047
2048 def __call__(self, msg, reset):
2049 if reset:
2050 Stepper.count = 1
2051 logger.info(msg)
2052 else:
2053 logger.info("STEP %s: '%s'", Stepper.count, msg)
2054 Stepper.count += 1
2055
2056
2057 def step(msg, reset=False):
2058 """
2059 Call Stepper to print test steps. Need to reset at the beginning of test.
2060 * ` msg` : Step message body.
2061 * `reset` : Reset step count to 1 when set to True.
2062 """
2063 _step = Stepper()
2064 _step(msg, reset)
2065
2066
2067 def do_countdown(secs):
2068 """
2069 Countdown timer display
2070 """
2071 for i in range(secs, 0, -1):
2072 sys.stdout.write("{} ".format(str(i)))
2073 sys.stdout.flush()
2074 sleep(1)
2075 return
2076
2077
2078 #############################################
2079 # These APIs, will used by testcase
2080 #############################################
2081 def create_interfaces_cfg(tgen, topo, build=False):
2082 """
2083 Create interface configuration for created topology. Basic Interface
2084 configuration is provided in input json file.
2085
2086 Parameters
2087 ----------
2088 * `tgen` : Topogen object
2089 * `topo` : json file data
2090 * `build` : Only for initial setup phase this is set as True.
2091
2092 Returns
2093 -------
2094 True or False
2095 """
2096
2097 def _create_interfaces_ospf_cfg(ospf, c_data, data, ospf_keywords):
2098 interface_data = []
2099 ip_ospf = "ipv6 ospf6" if ospf == "ospf6" else "ip ospf"
2100 for keyword in ospf_keywords:
2101 if keyword in data[ospf]:
2102 intf_ospf_value = c_data["links"][destRouterLink][ospf][keyword]
2103 if "delete" in data and data["delete"]:
2104 interface_data.append(
2105 "no {} {}".format(ip_ospf, keyword.replace("_", "-"))
2106 )
2107 else:
2108 interface_data.append(
2109 "{} {} {}".format(
2110 ip_ospf, keyword.replace("_", "-"), intf_ospf_value
2111 )
2112 )
2113 return interface_data
2114
2115 result = False
2116 topo = deepcopy(topo)
2117
2118 try:
2119 interface_data_dict = {}
2120
2121 for c_router, c_data in topo.items():
2122 interface_data = []
2123 for destRouterLink, data in sorted(c_data["links"].items()):
2124 # Loopback interfaces
2125 if "type" in data and data["type"] == "loopback":
2126 interface_name = destRouterLink
2127 else:
2128 interface_name = data["interface"]
2129
2130 interface_data.append("interface {}".format(str(interface_name)))
2131
2132 if "ipv4" in data:
2133 intf_addr = c_data["links"][destRouterLink]["ipv4"]
2134
2135 if "delete" in data and data["delete"]:
2136 interface_data.append("no ip address {}".format(intf_addr))
2137 else:
2138 interface_data.append("ip address {}".format(intf_addr))
2139 if "ipv6" in data:
2140 intf_addr = c_data["links"][destRouterLink]["ipv6"]
2141
2142 if "delete" in data and data["delete"]:
2143 interface_data.append("no ipv6 address {}".format(intf_addr))
2144 else:
2145 interface_data.append("ipv6 address {}".format(intf_addr))
2146
2147 # Wait for vrf interfaces to get link local address once they are up
2148 if (
2149 not destRouterLink == "lo"
2150 and "vrf" in topo[c_router]["links"][destRouterLink]
2151 ):
2152 vrf = topo[c_router]["links"][destRouterLink]["vrf"]
2153 intf = topo[c_router]["links"][destRouterLink]["interface"]
2154 ll = get_frr_ipv6_linklocal(tgen, c_router, intf=intf, vrf=vrf)
2155
2156 if "ipv6-link-local" in data:
2157 intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
2158
2159 if "delete" in data and data["delete"]:
2160 interface_data.append("no ipv6 address {}".format(intf_addr))
2161 else:
2162 interface_data.append("ipv6 address {}\n".format(intf_addr))
2163
2164 ospf_keywords = [
2165 "hello_interval",
2166 "dead_interval",
2167 "network",
2168 "priority",
2169 "cost",
2170 "mtu_ignore",
2171 ]
2172 if "ospf" in data:
2173 interface_data += _create_interfaces_ospf_cfg(
2174 "ospf", c_data, data, ospf_keywords + ["area"]
2175 )
2176 if "ospf6" in data:
2177 interface_data += _create_interfaces_ospf_cfg(
2178 "ospf6", c_data, data, ospf_keywords + ["area"]
2179 )
2180 if interface_data:
2181 interface_data_dict[c_router] = interface_data
2182
2183 result = create_common_configurations(
2184 tgen, interface_data_dict, "interface_config", build=build
2185 )
2186
2187 except InvalidCLIError:
2188 # Traceback
2189 errormsg = traceback.format_exc()
2190 logger.error(errormsg)
2191 return errormsg
2192
2193 return result
2194
2195
2196 def create_static_routes(tgen, input_dict, build=False):
2197 """
2198 Create static routes for given router as defined in input_dict
2199
2200 Parameters
2201 ----------
2202 * `tgen` : Topogen object
2203 * `input_dict` : Input dict data, required when configuring from testcase
2204 * `build` : Only for initial setup phase this is set as True.
2205
2206 Usage
2207 -----
2208 input_dict should be in the format below:
2209 # static_routes: list of all routes
2210 # network: network address
2211 # no_of_ip: number of next-hop address that will be configured
2212 # admin_distance: admin distance for route/routes.
2213 # next_hop: starting next-hop address
2214 # tag: tag id for static routes
2215 # vrf: VRF name in which static routes needs to be created
2216 # delete: True if config to be removed. Default False.
2217
2218 Example:
2219 "routers": {
2220 "r1": {
2221 "static_routes": [
2222 {
2223 "network": "100.0.20.1/32",
2224 "no_of_ip": 9,
2225 "admin_distance": 100,
2226 "next_hop": "10.0.0.1",
2227 "tag": 4001,
2228 "vrf": "RED_A"
2229 "delete": true
2230 }
2231 ]
2232 }
2233 }
2234
2235 Returns
2236 -------
2237 errormsg(str) or True
2238 """
2239 result = False
2240 logger.debug("Entering lib API: create_static_routes()")
2241 input_dict = deepcopy(input_dict)
2242
2243 try:
2244 static_routes_list_dict = {}
2245
2246 for router in input_dict.keys():
2247 if "static_routes" not in input_dict[router]:
2248 errormsg = "static_routes not present in input_dict"
2249 logger.info(errormsg)
2250 continue
2251
2252 static_routes_list = []
2253
2254 static_routes = input_dict[router]["static_routes"]
2255 for static_route in static_routes:
2256 del_action = static_route.setdefault("delete", False)
2257 no_of_ip = static_route.setdefault("no_of_ip", 1)
2258 network = static_route.setdefault("network", [])
2259 if type(network) is not list:
2260 network = [network]
2261
2262 admin_distance = static_route.setdefault("admin_distance", None)
2263 tag = static_route.setdefault("tag", None)
2264 vrf = static_route.setdefault("vrf", None)
2265 interface = static_route.setdefault("interface", None)
2266 next_hop = static_route.setdefault("next_hop", None)
2267 nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
2268
2269 ip_list = generate_ips(network, no_of_ip)
2270 for ip in ip_list:
2271 addr_type = validate_ip_address(ip)
2272
2273 if addr_type == "ipv4":
2274 cmd = "ip route {}".format(ip)
2275 else:
2276 cmd = "ipv6 route {}".format(ip)
2277
2278 if interface:
2279 cmd = "{} {}".format(cmd, interface)
2280
2281 if next_hop:
2282 cmd = "{} {}".format(cmd, next_hop)
2283
2284 if nexthop_vrf:
2285 cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
2286
2287 if vrf:
2288 cmd = "{} vrf {}".format(cmd, vrf)
2289
2290 if tag:
2291 cmd = "{} tag {}".format(cmd, str(tag))
2292
2293 if admin_distance:
2294 cmd = "{} {}".format(cmd, admin_distance)
2295
2296 if del_action:
2297 cmd = "no {}".format(cmd)
2298
2299 static_routes_list.append(cmd)
2300
2301 if static_routes_list:
2302 static_routes_list_dict[router] = static_routes_list
2303
2304 result = create_common_configurations(
2305 tgen, static_routes_list_dict, "static_route", build=build
2306 )
2307
2308 except InvalidCLIError:
2309 # Traceback
2310 errormsg = traceback.format_exc()
2311 logger.error(errormsg)
2312 return errormsg
2313
2314 logger.debug("Exiting lib API: create_static_routes()")
2315 return result
2316
2317
2318 def create_prefix_lists(tgen, input_dict, build=False):
2319 """
2320 Create ip prefix lists as per the config provided in input
2321 JSON or input_dict
2322 Parameters
2323 ----------
2324 * `tgen` : Topogen object
2325 * `input_dict` : Input dict data, required when configuring from testcase
2326 * `build` : Only for initial setup phase this is set as True.
2327 Usage
2328 -----
2329 # pf_lists_1: name of prefix-list, user defined
2330 # seqid: prefix-list seqid, auto-generated if not given by user
2331 # network: criteria for applying prefix-list
2332 # action: permit/deny
2333 # le: less than or equal number of bits
2334 # ge: greater than or equal number of bits
2335 Example
2336 -------
2337 input_dict = {
2338 "r1": {
2339 "prefix_lists":{
2340 "ipv4": {
2341 "pf_list_1": [
2342 {
2343 "seqid": 10,
2344 "network": "any",
2345 "action": "permit",
2346 "le": "32",
2347 "ge": "30",
2348 "delete": True
2349 }
2350 ]
2351 }
2352 }
2353 }
2354 }
2355 Returns
2356 -------
2357 errormsg or True
2358 """
2359
2360 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2361 result = False
2362 try:
2363 config_data_dict = {}
2364
2365 for router in input_dict.keys():
2366 if "prefix_lists" not in input_dict[router]:
2367 errormsg = "prefix_lists not present in input_dict"
2368 logger.debug(errormsg)
2369 continue
2370
2371 config_data = []
2372 prefix_lists = input_dict[router]["prefix_lists"]
2373 for addr_type, prefix_data in prefix_lists.items():
2374 if not check_address_types(addr_type):
2375 continue
2376
2377 for prefix_name, prefix_list in prefix_data.items():
2378 for prefix_dict in prefix_list:
2379 if "action" not in prefix_dict or "network" not in prefix_dict:
2380 errormsg = "'action' or network' missing in" " input_dict"
2381 return errormsg
2382
2383 network_addr = prefix_dict["network"]
2384 action = prefix_dict["action"]
2385 le = prefix_dict.setdefault("le", None)
2386 ge = prefix_dict.setdefault("ge", None)
2387 seqid = prefix_dict.setdefault("seqid", None)
2388 del_action = prefix_dict.setdefault("delete", False)
2389 if seqid is None:
2390 seqid = get_seq_id("prefix_lists", router, prefix_name)
2391 else:
2392 set_seq_id("prefix_lists", router, seqid, prefix_name)
2393
2394 if addr_type == "ipv4":
2395 protocol = "ip"
2396 else:
2397 protocol = "ipv6"
2398
2399 cmd = "{} prefix-list {} seq {} {} {}".format(
2400 protocol, prefix_name, seqid, action, network_addr
2401 )
2402 if le:
2403 cmd = "{} le {}".format(cmd, le)
2404 if ge:
2405 cmd = "{} ge {}".format(cmd, ge)
2406
2407 if del_action:
2408 cmd = "no {}".format(cmd)
2409
2410 config_data.append(cmd)
2411 if config_data:
2412 config_data_dict[router] = config_data
2413
2414 result = create_common_configurations(
2415 tgen, config_data_dict, "prefix_list", build=build
2416 )
2417
2418 except InvalidCLIError:
2419 # Traceback
2420 errormsg = traceback.format_exc()
2421 logger.error(errormsg)
2422 return errormsg
2423
2424 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2425 return result
2426
2427
2428 def create_route_maps(tgen, input_dict, build=False):
2429 """
2430 Create route-map on the devices as per the arguments passed
2431 Parameters
2432 ----------
2433 * `tgen` : Topogen object
2434 * `input_dict` : Input dict data, required when configuring from testcase
2435 * `build` : Only for initial setup phase this is set as True.
2436 Usage
2437 -----
2438 # route_maps: key, value pair for route-map name and its attribute
2439 # rmap_match_prefix_list_1: user given name for route-map
2440 # action: PERMIT/DENY
2441 # match: key,value pair for match criteria. prefix_list, community-list,
2442 large-community-list or tag. Only one option at a time.
2443 # prefix_list: name of prefix list
2444 # large-community-list: name of large community list
2445 # community-ist: name of community list
2446 # tag: tag id for static routes
2447 # set: key, value pair for modifying route attributes
2448 # localpref: preference value for the network
2449 # med: metric value advertised for AS
2450 # aspath: set AS path value
2451 # weight: weight for the route
2452 # community: standard community value to be attached
2453 # large_community: large community value to be attached
2454 # community_additive: if set to "additive", adds community/large-community
2455 value to the existing values of the network prefix
2456 Example:
2457 --------
2458 input_dict = {
2459 "r1": {
2460 "route_maps": {
2461 "rmap_match_prefix_list_1": [
2462 {
2463 "action": "PERMIT",
2464 "match": {
2465 "ipv4": {
2466 "prefix_list": "pf_list_1"
2467 }
2468 "ipv6": {
2469 "prefix_list": "pf_list_1"
2470 }
2471 "large-community-list": {
2472 "id": "community_1",
2473 "exact_match": True
2474 }
2475 "community_list": {
2476 "id": "community_2",
2477 "exact_match": True
2478 }
2479 "tag": "tag_id"
2480 },
2481 "set": {
2482 "locPrf": 150,
2483 "metric": 30,
2484 "path": {
2485 "num": 20000,
2486 "action": "prepend",
2487 },
2488 "weight": 500,
2489 "community": {
2490 "num": "1:2 2:3",
2491 "action": additive
2492 }
2493 "large_community": {
2494 "num": "1:2:3 4:5;6",
2495 "action": additive
2496 },
2497 }
2498 }
2499 ]
2500 }
2501 }
2502 }
2503 Returns
2504 -------
2505 errormsg(str) or True
2506 """
2507
2508 result = False
2509 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2510 input_dict = deepcopy(input_dict)
2511 try:
2512 rmap_data_dict = {}
2513
2514 for router in input_dict.keys():
2515 if "route_maps" not in input_dict[router]:
2516 logger.debug("route_maps not present in input_dict")
2517 continue
2518 rmap_data = []
2519 for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
2520
2521 for rmap_dict in rmap_value:
2522 del_action = rmap_dict.setdefault("delete", False)
2523
2524 if del_action:
2525 rmap_data.append("no route-map {}".format(rmap_name))
2526 continue
2527
2528 if "action" not in rmap_dict:
2529 errormsg = "action not present in input_dict"
2530 logger.error(errormsg)
2531 return False
2532
2533 rmap_action = rmap_dict.setdefault("action", "deny")
2534
2535 seq_id = rmap_dict.setdefault("seq_id", None)
2536 if seq_id is None:
2537 seq_id = get_seq_id("route_maps", router, rmap_name)
2538 else:
2539 set_seq_id("route_maps", router, seq_id, rmap_name)
2540
2541 rmap_data.append(
2542 "route-map {} {} {}".format(rmap_name, rmap_action, seq_id)
2543 )
2544
2545 if "continue" in rmap_dict:
2546 continue_to = rmap_dict["continue"]
2547 if continue_to:
2548 rmap_data.append("on-match goto {}".format(continue_to))
2549 else:
2550 logger.error(
2551 "In continue, 'route-map entry "
2552 "sequence number' is not provided"
2553 )
2554 return False
2555
2556 if "goto" in rmap_dict:
2557 go_to = rmap_dict["goto"]
2558 if go_to:
2559 rmap_data.append("on-match goto {}".format(go_to))
2560 else:
2561 logger.error(
2562 "In goto, 'Goto Clause number' is not" " provided"
2563 )
2564 return False
2565
2566 if "call" in rmap_dict:
2567 call_rmap = rmap_dict["call"]
2568 if call_rmap:
2569 rmap_data.append("call {}".format(call_rmap))
2570 else:
2571 logger.error(
2572 "In call, 'destination Route-Map' is" " not provided"
2573 )
2574 return False
2575
2576 # Verifying if SET criteria is defined
2577 if "set" in rmap_dict:
2578 set_data = rmap_dict["set"]
2579 ipv4_data = set_data.setdefault("ipv4", {})
2580 ipv6_data = set_data.setdefault("ipv6", {})
2581 local_preference = set_data.setdefault("locPrf", None)
2582 metric = set_data.setdefault("metric", None)
2583 metric_type = set_data.setdefault("metric-type", None)
2584 as_path = set_data.setdefault("path", {})
2585 weight = set_data.setdefault("weight", None)
2586 community = set_data.setdefault("community", {})
2587 large_community = set_data.setdefault("large_community", {})
2588 large_comm_list = set_data.setdefault("large_comm_list", {})
2589 set_action = set_data.setdefault("set_action", None)
2590 nexthop = set_data.setdefault("nexthop", None)
2591 origin = set_data.setdefault("origin", None)
2592 ext_comm_list = set_data.setdefault("extcommunity", {})
2593 metrictype = set_data.setdefault("metric-type", {})
2594
2595 # Local Preference
2596 if local_preference:
2597 rmap_data.append(
2598 "set local-preference {}".format(local_preference)
2599 )
2600
2601 # Metric-Type
2602 if metrictype:
2603 rmap_data.append("set metric-type {}\n".format(metrictype))
2604
2605 # Metric
2606 if metric:
2607 del_comm = set_data.setdefault("delete", None)
2608 if del_comm:
2609 rmap_data.append("no set metric {}".format(metric))
2610 else:
2611 rmap_data.append("set metric {}".format(metric))
2612
2613 # Origin
2614 if origin:
2615 rmap_data.append("set origin {} \n".format(origin))
2616
2617 # AS Path Prepend
2618 if as_path:
2619 as_num = as_path.setdefault("as_num", None)
2620 as_action = as_path.setdefault("as_action", None)
2621 if as_action and as_num:
2622 rmap_data.append(
2623 "set as-path {} {}".format(as_action, as_num)
2624 )
2625
2626 # Community
2627 if community:
2628 num = community.setdefault("num", None)
2629 comm_action = community.setdefault("action", None)
2630 if num:
2631 cmd = "set community {}".format(num)
2632 if comm_action:
2633 cmd = "{} {}".format(cmd, comm_action)
2634 rmap_data.append(cmd)
2635 else:
2636 logger.error("In community, AS Num not" " provided")
2637 return False
2638
2639 if large_community:
2640 num = large_community.setdefault("num", None)
2641 comm_action = large_community.setdefault("action", None)
2642 if num:
2643 cmd = "set large-community {}".format(num)
2644 if comm_action:
2645 cmd = "{} {}".format(cmd, comm_action)
2646
2647 rmap_data.append(cmd)
2648 else:
2649 logger.error(
2650 "In large_community, AS Num not" " provided"
2651 )
2652 return False
2653 if large_comm_list:
2654 id = large_comm_list.setdefault("id", None)
2655 del_comm = large_comm_list.setdefault("delete", None)
2656 if id:
2657 cmd = "set large-comm-list {}".format(id)
2658 if del_comm:
2659 cmd = "{} delete".format(cmd)
2660
2661 rmap_data.append(cmd)
2662 else:
2663 logger.error("In large_comm_list 'id' not" " provided")
2664 return False
2665
2666 if ext_comm_list:
2667 rt = ext_comm_list.setdefault("rt", None)
2668 del_comm = ext_comm_list.setdefault("delete", None)
2669 if rt:
2670 cmd = "set extcommunity rt {}".format(rt)
2671 if del_comm:
2672 cmd = "{} delete".format(cmd)
2673
2674 rmap_data.append(cmd)
2675 else:
2676 logger.debug("In ext_comm_list 'rt' not" " provided")
2677 return False
2678
2679 # Weight
2680 if weight:
2681 rmap_data.append("set weight {}".format(weight))
2682 if ipv6_data:
2683 nexthop = ipv6_data.setdefault("nexthop", None)
2684 if nexthop:
2685 rmap_data.append("set ipv6 next-hop {}".format(nexthop))
2686
2687 # Adding MATCH and SET sequence to RMAP if defined
2688 if "match" in rmap_dict:
2689 match_data = rmap_dict["match"]
2690 ipv4_data = match_data.setdefault("ipv4", {})
2691 ipv6_data = match_data.setdefault("ipv6", {})
2692 community = match_data.setdefault("community_list", {})
2693 large_community = match_data.setdefault("large_community", {})
2694 large_community_list = match_data.setdefault(
2695 "large_community_list", {}
2696 )
2697
2698 metric = match_data.setdefault("metric", None)
2699 source_vrf = match_data.setdefault("source-vrf", None)
2700
2701 if ipv4_data:
2702 # fetch prefix list data from rmap
2703 prefix_name = ipv4_data.setdefault("prefix_lists", None)
2704 if prefix_name:
2705 rmap_data.append(
2706 "match ip address"
2707 " prefix-list {}".format(prefix_name)
2708 )
2709
2710 # fetch tag data from rmap
2711 tag = ipv4_data.setdefault("tag", None)
2712 if tag:
2713 rmap_data.append("match tag {}".format(tag))
2714
2715 # fetch large community data from rmap
2716 large_community_list = ipv4_data.setdefault(
2717 "large_community_list", {}
2718 )
2719 large_community = match_data.setdefault(
2720 "large_community", {}
2721 )
2722
2723 if ipv6_data:
2724 prefix_name = ipv6_data.setdefault("prefix_lists", None)
2725 if prefix_name:
2726 rmap_data.append(
2727 "match ipv6 address"
2728 " prefix-list {}".format(prefix_name)
2729 )
2730
2731 # fetch tag data from rmap
2732 tag = ipv6_data.setdefault("tag", None)
2733 if tag:
2734 rmap_data.append("match tag {}".format(tag))
2735
2736 # fetch large community data from rmap
2737 large_community_list = ipv6_data.setdefault(
2738 "large_community_list", {}
2739 )
2740 large_community = match_data.setdefault(
2741 "large_community", {}
2742 )
2743
2744 if community:
2745 if "id" not in community:
2746 logger.error(
2747 "'id' is mandatory for "
2748 "community-list in match"
2749 " criteria"
2750 )
2751 return False
2752 cmd = "match community {}".format(community["id"])
2753 exact_match = community.setdefault("exact_match", False)
2754 if exact_match:
2755 cmd = "{} exact-match".format(cmd)
2756
2757 rmap_data.append(cmd)
2758 if large_community:
2759 if "id" not in large_community:
2760 logger.error(
2761 "'id' is mandatory for "
2762 "large-community-list in match "
2763 "criteria"
2764 )
2765 return False
2766 cmd = "match large-community {}".format(
2767 large_community["id"]
2768 )
2769 exact_match = large_community.setdefault(
2770 "exact_match", False
2771 )
2772 if exact_match:
2773 cmd = "{} exact-match".format(cmd)
2774 rmap_data.append(cmd)
2775 if large_community_list:
2776 if "id" not in large_community_list:
2777 logger.error(
2778 "'id' is mandatory for "
2779 "large-community-list in match "
2780 "criteria"
2781 )
2782 return False
2783 cmd = "match large-community {}".format(
2784 large_community_list["id"]
2785 )
2786 exact_match = large_community_list.setdefault(
2787 "exact_match", False
2788 )
2789 if exact_match:
2790 cmd = "{} exact-match".format(cmd)
2791 rmap_data.append(cmd)
2792
2793 if source_vrf:
2794 cmd = "match source-vrf {}".format(source_vrf)
2795 rmap_data.append(cmd)
2796
2797 if metric:
2798 cmd = "match metric {}".format(metric)
2799 rmap_data.append(cmd)
2800
2801 if rmap_data:
2802 rmap_data_dict[router] = rmap_data
2803
2804 result = create_common_configurations(
2805 tgen, rmap_data_dict, "route_maps", build=build
2806 )
2807
2808 except InvalidCLIError:
2809 # Traceback
2810 errormsg = traceback.format_exc()
2811 logger.error(errormsg)
2812 return errormsg
2813
2814 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2815 return result
2816
2817
2818 def delete_route_maps(tgen, input_dict):
2819 """
2820 Delete ip route maps from device
2821 * `tgen` : Topogen object
2822 * `input_dict` : for which router,
2823 route map has to be deleted
2824 Usage
2825 -----
2826 # Delete route-map rmap_1 and rmap_2 from router r1
2827 input_dict = {
2828 "r1": {
2829 "route_maps": ["rmap_1", "rmap__2"]
2830 }
2831 }
2832 result = delete_route_maps("ipv4", input_dict)
2833 Returns
2834 -------
2835 errormsg(str) or True
2836 """
2837 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2838
2839 for router in input_dict.keys():
2840 route_maps = input_dict[router]["route_maps"][:]
2841 rmap_data = input_dict[router]
2842 rmap_data["route_maps"] = {}
2843 for route_map_name in route_maps:
2844 rmap_data["route_maps"].update({route_map_name: [{"delete": True}]})
2845
2846 return create_route_maps(tgen, input_dict)
2847
2848
2849 def create_bgp_community_lists(tgen, input_dict, build=False):
2850 """
2851 Create bgp community-list or large-community-list on the devices as per
2852 the arguments passed. Takes list of communities in input.
2853 Parameters
2854 ----------
2855 * `tgen` : Topogen object
2856 * `input_dict` : Input dict data, required when configuring from testcase
2857 * `build` : Only for initial setup phase this is set as True.
2858 Usage
2859 -----
2860 input_dict_1 = {
2861 "r3": {
2862 "bgp_community_lists": [
2863 {
2864 "community_type": "standard",
2865 "action": "permit",
2866 "name": "rmap_lcomm_{}".format(addr_type),
2867 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2868 "large": True
2869 }
2870 ]
2871 }
2872 }
2873 }
2874 result = create_bgp_community_lists(tgen, input_dict_1)
2875 """
2876
2877 result = False
2878 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2879 input_dict = deepcopy(input_dict)
2880 try:
2881 config_data_dict = {}
2882
2883 for router in input_dict.keys():
2884 if "bgp_community_lists" not in input_dict[router]:
2885 errormsg = "bgp_community_lists not present in input_dict"
2886 logger.debug(errormsg)
2887 continue
2888
2889 config_data = []
2890
2891 community_list = input_dict[router]["bgp_community_lists"]
2892 for community_dict in community_list:
2893 del_action = community_dict.setdefault("delete", False)
2894 community_type = community_dict.setdefault("community_type", None)
2895 action = community_dict.setdefault("action", None)
2896 value = community_dict.setdefault("value", "")
2897 large = community_dict.setdefault("large", None)
2898 name = community_dict.setdefault("name", None)
2899 if large:
2900 cmd = "bgp large-community-list"
2901 else:
2902 cmd = "bgp community-list"
2903
2904 if not large and not (community_type and action and value):
2905 errormsg = (
2906 "community_type, action and value are "
2907 "required in bgp_community_list"
2908 )
2909 logger.error(errormsg)
2910 return False
2911
2912 cmd = "{} {} {} {} {}".format(cmd, community_type, name, action, value)
2913
2914 if del_action:
2915 cmd = "no {}".format(cmd)
2916
2917 config_data.append(cmd)
2918
2919 if config_data:
2920 config_data_dict[router] = config_data
2921
2922 result = create_common_configurations(
2923 tgen, config_data_dict, "bgp_community_list", build=build
2924 )
2925
2926 except InvalidCLIError:
2927 # Traceback
2928 errormsg = traceback.format_exc()
2929 logger.error(errormsg)
2930 return errormsg
2931
2932 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2933 return result
2934
2935
2936 def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
2937 """
2938 Shutdown or bringup router's interface "
2939 * `tgen` : Topogen object
2940 * `dut` : Device under test
2941 * `intf_name` : Interface name to be shut/no shut
2942 * `ifaceaction` : Action, to shut/no shut interface,
2943 by default is False
2944 Usage
2945 -----
2946 dut = "r3"
2947 intf = "r3-r1-eth0"
2948 # Shut down interface
2949 shutdown_bringup_interface(tgen, dut, intf, False)
2950 # Bring up interface
2951 shutdown_bringup_interface(tgen, dut, intf, True)
2952 Returns
2953 -------
2954 errormsg(str) or True
2955 """
2956
2957 router_list = tgen.routers()
2958 if ifaceaction:
2959 logger.info("Bringing up interface {} : {}".format(dut, intf_name))
2960 else:
2961 logger.info("Shutting down interface {} : {}".format(dut, intf_name))
2962
2963 interface_set_status(router_list[dut], intf_name, ifaceaction)
2964
2965
2966 def addKernelRoute(
2967 tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
2968 ):
2969 """
2970 Add route to kernel
2971
2972 Parameters:
2973 -----------
2974 * `tgen` : Topogen object
2975 * `router`: router for which kernel routes needs to be added
2976 * `intf`: interface name, for which kernel routes needs to be added
2977 * `bindToAddress`: bind to <host>, an interface or multicast
2978 address
2979
2980 returns:
2981 --------
2982 errormsg or True
2983 """
2984
2985 logger.debug("Entering lib API: addKernelRoute()")
2986
2987 rnode = tgen.gears[router]
2988
2989 if type(group_addr_range) is not list:
2990 group_addr_range = [group_addr_range]
2991
2992 for grp_addr in group_addr_range:
2993
2994 addr_type = validate_ip_address(grp_addr)
2995 if addr_type == "ipv4":
2996 if next_hop is not None:
2997 cmd = "ip route add {} via {}".format(grp_addr, next_hop)
2998 else:
2999 cmd = "ip route add {} dev {}".format(grp_addr, intf)
3000 if del_action:
3001 cmd = "ip route del {}".format(grp_addr)
3002 verify_cmd = "ip route"
3003 elif addr_type == "ipv6":
3004 if intf and src:
3005 cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
3006 else:
3007 cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
3008 verify_cmd = "ip -6 route"
3009 if del_action:
3010 cmd = "ip -6 route del {}".format(grp_addr)
3011
3012 logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
3013 output = rnode.run(cmd)
3014
3015 def check_in_kernel(rnode, verify_cmd, grp_addr, router):
3016 # Verifying if ip route added to kernel
3017 errormsg = None
3018 result = rnode.run(verify_cmd)
3019 logger.debug("{}\n{}".format(verify_cmd, result))
3020 if "/" in grp_addr:
3021 ip, mask = grp_addr.split("/")
3022 if mask == "32" or mask == "128":
3023 grp_addr = ip
3024 else:
3025 mask = "32" if addr_type == "ipv4" else "128"
3026
3027 if not re_search(r"{}".format(grp_addr), result) and mask != "0":
3028 errormsg = (
3029 "[DUT: {}]: Kernal route is not added for group"
3030 " address {} Config output: {}".format(
3031 router, grp_addr, output
3032 )
3033 )
3034
3035 return errormsg
3036
3037 test_func = functools.partial(
3038 check_in_kernel, rnode, verify_cmd, grp_addr, router
3039 )
3040 (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1)
3041 assert result, out
3042
3043 logger.debug("Exiting lib API: addKernelRoute()")
3044 return True
3045
3046
3047 def configure_vxlan(tgen, input_dict):
3048 """
3049 Add and configure vxlan
3050
3051 * `tgen`: tgen object
3052 * `input_dict` : data for vxlan config
3053
3054 Usage:
3055 ------
3056 input_dict= {
3057 "dcg2":{
3058 "vxlan":[{
3059 "vxlan_name": "vxlan75100",
3060 "vxlan_id": "75100",
3061 "dstport": 4789,
3062 "local_addr": "120.0.0.1",
3063 "learning": "no",
3064 "delete": True
3065 }]
3066 }
3067 }
3068
3069 configure_vxlan(tgen, input_dict)
3070
3071 Returns:
3072 -------
3073 True or errormsg
3074
3075 """
3076
3077 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3078
3079 router_list = tgen.routers()
3080 for dut in input_dict.keys():
3081 rnode = router_list[dut]
3082
3083 if "vxlan" in input_dict[dut]:
3084 for vxlan_dict in input_dict[dut]["vxlan"]:
3085 cmd = "ip link "
3086
3087 del_vxlan = vxlan_dict.setdefault("delete", None)
3088 vxlan_names = vxlan_dict.setdefault("vxlan_name", [])
3089 vxlan_ids = vxlan_dict.setdefault("vxlan_id", [])
3090 dstport = vxlan_dict.setdefault("dstport", None)
3091 local_addr = vxlan_dict.setdefault("local_addr", None)
3092 learning = vxlan_dict.setdefault("learning", None)
3093
3094 config_data = []
3095 if vxlan_names and vxlan_ids:
3096 for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids):
3097 cmd = "ip link"
3098
3099 if del_vxlan:
3100 cmd = "{} del {} type vxlan id {}".format(
3101 cmd, vxlan_name, vxlan_id
3102 )
3103 else:
3104 cmd = "{} add {} type vxlan id {}".format(
3105 cmd, vxlan_name, vxlan_id
3106 )
3107
3108 if dstport:
3109 cmd = "{} dstport {}".format(cmd, dstport)
3110
3111 if local_addr:
3112 ip_cmd = "ip addr add {} dev {}".format(
3113 local_addr, vxlan_name
3114 )
3115 if del_vxlan:
3116 ip_cmd = "ip addr del {} dev {}".format(
3117 local_addr, vxlan_name
3118 )
3119
3120 config_data.append(ip_cmd)
3121
3122 cmd = "{} local {}".format(cmd, local_addr)
3123
3124 if learning == "no":
3125 cmd = "{} nolearning".format(cmd)
3126
3127 elif learning == "yes":
3128 cmd = "{} learning".format(cmd)
3129
3130 config_data.append(cmd)
3131
3132 try:
3133 for _cmd in config_data:
3134 logger.info("[DUT: %s]: Running command: %s", dut, _cmd)
3135 rnode.run(_cmd)
3136
3137 except InvalidCLIError:
3138 # Traceback
3139 errormsg = traceback.format_exc()
3140 logger.error(errormsg)
3141 return errormsg
3142
3143 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3144
3145 return True
3146
3147
3148 def configure_brctl(tgen, topo, input_dict):
3149 """
3150 Add and configure brctl
3151
3152 * `tgen`: tgen object
3153 * `input_dict` : data for brctl config
3154
3155 Usage:
3156 ------
3157 input_dict= {
3158 dut:{
3159 "brctl": [{
3160 "brctl_name": "br100",
3161 "addvxlan": "vxlan75100",
3162 "vrf": "RED",
3163 "stp": "off"
3164 }]
3165 }
3166 }
3167
3168 configure_brctl(tgen, topo, input_dict)
3169
3170 Returns:
3171 -------
3172 True or errormsg
3173
3174 """
3175
3176 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3177
3178 router_list = tgen.routers()
3179 for dut in input_dict.keys():
3180 rnode = router_list[dut]
3181
3182 if "brctl" in input_dict[dut]:
3183 for brctl_dict in input_dict[dut]["brctl"]:
3184
3185 brctl_names = brctl_dict.setdefault("brctl_name", [])
3186 addvxlans = brctl_dict.setdefault("addvxlan", [])
3187 stp_values = brctl_dict.setdefault("stp", [])
3188 vrfs = brctl_dict.setdefault("vrf", [])
3189
3190 ip_cmd = "ip link set"
3191 for brctl_name, vxlan, vrf, stp in zip(
3192 brctl_names, addvxlans, vrfs, stp_values
3193 ):
3194
3195 ip_cmd_list = []
3196 cmd = "ip link add name {} type bridge stp_state {}".format(
3197 brctl_name, stp
3198 )
3199
3200 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3201 rnode.run(cmd)
3202
3203 ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name))
3204
3205 if vxlan:
3206 cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name)
3207
3208 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3209 rnode.run(cmd)
3210
3211 ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan))
3212
3213 if vrf:
3214 ip_cmd_list.append(
3215 "{} dev {} master {}".format(ip_cmd, brctl_name, vrf)
3216 )
3217
3218 for intf_name, data in topo["routers"][dut]["links"].items():
3219 if "vrf" not in data:
3220 continue
3221
3222 if data["vrf"] == vrf:
3223 ip_cmd_list.append(
3224 "{} up dev {}".format(ip_cmd, data["interface"])
3225 )
3226
3227 try:
3228 for _ip_cmd in ip_cmd_list:
3229 logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd)
3230 rnode.run(_ip_cmd)
3231
3232 except InvalidCLIError:
3233 # Traceback
3234 errormsg = traceback.format_exc()
3235 logger.error(errormsg)
3236 return errormsg
3237
3238 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3239 return True
3240
3241
3242 def configure_interface_mac(tgen, input_dict):
3243 """
3244 Add and configure brctl
3245
3246 * `tgen`: tgen object
3247 * `input_dict` : data for mac config
3248
3249 input_mac= {
3250 "edge1":{
3251 "br75100": "00:80:48:BA:d1:00,
3252 "br75200": "00:80:48:BA:d1:00
3253 }
3254 }
3255
3256 configure_interface_mac(tgen, input_mac)
3257
3258 Returns:
3259 -------
3260 True or errormsg
3261
3262 """
3263
3264 router_list = tgen.routers()
3265 for dut in input_dict.keys():
3266 rnode = router_list[dut]
3267
3268 for intf, mac in input_dict[dut].items():
3269 cmd = "ip link set {} address {}".format(intf, mac)
3270 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3271
3272 try:
3273 result = rnode.run(cmd)
3274 if len(result) != 0:
3275 return result
3276
3277 except InvalidCLIError:
3278 # Traceback
3279 errormsg = traceback.format_exc()
3280 logger.error(errormsg)
3281 return errormsg
3282
3283 return True
3284
3285
3286 def socat_send_mld_join(
3287 tgen,
3288 server,
3289 protocol_option,
3290 mld_groups,
3291 send_from_intf,
3292 send_from_intf_ip=None,
3293 port=12345,
3294 reuseaddr=True,
3295 ):
3296 """
3297 API to send MLD join using SOCAT tool
3298
3299 Parameters:
3300 -----------
3301 * `tgen` : Topogen object
3302 * `server`: iperf server, from where IGMP join would be sent
3303 * `protocol_option`: Protocol options, ex: UDP6-RECV
3304 * `mld_groups`: IGMP group for which join has to be sent
3305 * `send_from_intf`: Interface from which join would be sent
3306 * `send_from_intf_ip`: Interface IP, default is None
3307 * `port`: Port to be used, default is 12345
3308 * `reuseaddr`: True|False, bydefault True
3309
3310 returns:
3311 --------
3312 errormsg or True
3313 """
3314
3315 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3316
3317 rnode = tgen.routers()[server]
3318 socat_args = "socat -u "
3319
3320 # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
3321 if protocol_option:
3322 socat_args += "{}".format(protocol_option)
3323
3324 if port:
3325 socat_args += ":{},".format(port)
3326
3327 if reuseaddr:
3328 socat_args += "{},".format("reuseaddr")
3329
3330 # Group address range to cover
3331 if mld_groups:
3332 if not isinstance(mld_groups, list):
3333 mld_groups = [mld_groups]
3334
3335 for mld_group in mld_groups:
3336 socat_cmd = socat_args
3337 join_option = "ipv6-join-group"
3338
3339 if send_from_intf and not send_from_intf_ip:
3340 socat_cmd += "{}='[{}]:{}'".format(join_option, mld_group, send_from_intf)
3341 else:
3342 socat_cmd += "{}='[{}]:{}:[{}]'".format(
3343 join_option, mld_group, send_from_intf, send_from_intf_ip
3344 )
3345
3346 socat_cmd += " STDOUT"
3347
3348 socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
3349
3350 # Run socat command to send IGMP join
3351 logger.info("[DUT: {}]: Running command: [{}]".format(server, socat_cmd))
3352 output = rnode.run("set +m; {} sleep 0.5".format(socat_cmd))
3353
3354 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3355 return True
3356
3357
3358 def socat_send_pim6_traffic(
3359 tgen,
3360 server,
3361 protocol_option,
3362 mld_groups,
3363 send_from_intf,
3364 port=12345,
3365 multicast_hops=True,
3366 ):
3367 """
3368 API to send pim6 data taffic using SOCAT tool
3369
3370 Parameters:
3371 -----------
3372 * `tgen` : Topogen object
3373 * `server`: iperf server, from where IGMP join would be sent
3374 * `protocol_option`: Protocol options, ex: UDP6-RECV
3375 * `mld_groups`: MLD group for which join has to be sent
3376 * `send_from_intf`: Interface from which join would be sent
3377 * `port`: Port to be used, default is 12345
3378 * `multicast_hops`: multicast-hops count, default is 255
3379
3380 returns:
3381 --------
3382 errormsg or True
3383 """
3384
3385 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3386
3387 rnode = tgen.routers()[server]
3388 socat_args = "socat -u STDIO "
3389
3390 # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
3391 if protocol_option:
3392 socat_args += "'{}".format(protocol_option)
3393
3394 # Group address range to cover
3395 if mld_groups:
3396 if not isinstance(mld_groups, list):
3397 mld_groups = [mld_groups]
3398
3399 for mld_group in mld_groups:
3400 socat_cmd = socat_args
3401 if port:
3402 socat_cmd += ":[{}]:{},".format(mld_group, port)
3403
3404 if send_from_intf:
3405 socat_cmd += "interface={0},so-bindtodevice={0},".format(send_from_intf)
3406
3407 if multicast_hops:
3408 socat_cmd += "multicast-hops=255'"
3409
3410 socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
3411
3412 # Run socat command to send pim6 traffic
3413 logger.info(
3414 "[DUT: {}]: Running command: [set +m; ( while sleep 1; do date; done ) | {}]".format(
3415 server, socat_cmd
3416 )
3417 )
3418
3419 # Open a shell script file and write data to it, which will be
3420 # used to send pim6 traffic continously
3421 traffic_shell_script = "{}/{}/traffic.sh".format(tgen.logdir, server)
3422 with open("{}".format(traffic_shell_script), "w") as taffic_sh:
3423 taffic_sh.write(
3424 "#!/usr/bin/env bash\n( while sleep 1; do date; done ) | {}\n".format(
3425 socat_cmd
3426 )
3427 )
3428
3429 rnode.run("chmod 755 {}".format(traffic_shell_script))
3430 output = rnode.run("{} &> /dev/null".format(traffic_shell_script))
3431
3432 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3433 return True
3434
3435
3436 def kill_socat(tgen, dut=None, action=None):
3437 """
3438 Killing socat process if running for any router in topology
3439
3440 Parameters:
3441 -----------
3442 * `tgen` : Topogen object
3443 * `dut` : Any iperf hostname to send igmp prune
3444 * `action`: to kill mld join using socat
3445 to kill mld traffic using socat
3446
3447 Usage:
3448 ------
3449 kill_socat(tgen, dut ="i6", action="remove_mld_join")
3450
3451 """
3452
3453 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3454
3455 router_list = tgen.routers()
3456 for router, rnode in router_list.items():
3457 if dut is not None and router != dut:
3458 continue
3459
3460 if action == "remove_mld_join":
3461 cmd = "ps -ef | grep socat | grep UDP6-RECV | grep {}".format(router)
3462 elif action == "remove_mld_traffic":
3463 cmd = "ps -ef | grep socat | grep UDP6-SEND | grep {}".format(router)
3464 else:
3465 cmd = "ps -ef | grep socat".format(router)
3466
3467 awk_cmd = "awk -F' ' '{print $2}' | xargs kill -9 &>/dev/null &"
3468 cmd = "{} | {}".format(cmd, awk_cmd)
3469
3470 logger.debug("[DUT: {}]: Running command: [{}]".format(router, cmd))
3471 rnode.run(cmd)
3472
3473 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3474
3475
3476 #############################################
3477 # Verification APIs
3478 #############################################
3479 @retry(retry_timeout=40)
3480 def verify_rib(
3481 tgen,
3482 addr_type,
3483 dut,
3484 input_dict,
3485 next_hop=None,
3486 protocol=None,
3487 tag=None,
3488 metric=None,
3489 fib=None,
3490 count_only=False,
3491 admin_distance=None,
3492 ):
3493 """
3494 Data will be read from input_dict or input JSON file, API will generate
3495 same prefixes, which were redistributed by either create_static_routes() or
3496 advertise_networks_using_network_command() and do will verify next_hop and
3497 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
3498 command o/p.
3499
3500 Parameters
3501 ----------
3502 * `tgen` : topogen object
3503 * `addr_type` : ip type, ipv4/ipv6
3504 * `dut`: Device Under Test, for which user wants to test the data
3505 * `input_dict` : input dict, has details of static routes
3506 * `next_hop`[optional]: next_hop which needs to be verified,
3507 default: static
3508 * `protocol`[optional]: protocol, default = None
3509 * `count_only`[optional]: count of nexthops only, not specific addresses,
3510 default = False
3511
3512 Usage
3513 -----
3514 # RIB can be verified for static routes OR network advertised using
3515 network command. Following are input_dicts to create static routes
3516 and advertise networks using network command. Any one of the input_dict
3517 can be passed to verify_rib() to verify routes in DUT"s RIB.
3518
3519 # Creating static routes for r1
3520 input_dict = {
3521 "r1": {
3522 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
3523 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
3524 }}
3525 # Advertising networks using network command in router r1
3526 input_dict = {
3527 "r1": {
3528 "advertise_networks": [{"start_ip": "20.0.0.0/32",
3529 "no_of_network": 10},
3530 {"start_ip": "30.0.0.0/32"}]
3531 }}
3532 # Verifying ipv4 routes in router r1 learned via BGP
3533 dut = "r2"
3534 protocol = "bgp"
3535 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
3536
3537 Returns
3538 -------
3539 errormsg(str) or True
3540 """
3541
3542 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3543
3544 router_list = tgen.routers()
3545 additional_nexthops_in_required_nhs = []
3546 found_hops = []
3547 for routerInput in input_dict.keys():
3548 for router, rnode in router_list.items():
3549 if router != dut:
3550 continue
3551
3552 logger.info("Checking router %s RIB:", router)
3553
3554 # Verifying RIB routes
3555 if addr_type == "ipv4":
3556 command = "show ip route"
3557 else:
3558 command = "show ipv6 route"
3559
3560 found_routes = []
3561 missing_routes = []
3562
3563 if "static_routes" in input_dict[routerInput]:
3564 static_routes = input_dict[routerInput]["static_routes"]
3565
3566 for static_route in static_routes:
3567 if "vrf" in static_route and static_route["vrf"] is not None:
3568
3569 logger.info(
3570 "[DUT: {}]: Verifying routes for VRF:"
3571 " {}".format(router, static_route["vrf"])
3572 )
3573
3574 cmd = "{} vrf {}".format(command, static_route["vrf"])
3575
3576 else:
3577 cmd = "{}".format(command)
3578
3579 if protocol:
3580 cmd = "{} {}".format(cmd, protocol)
3581
3582 cmd = "{} json".format(cmd)
3583
3584 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3585
3586 # Verifying output dictionary rib_routes_json is not empty
3587 if bool(rib_routes_json) is False:
3588 errormsg = "No route found in rib of router {}..".format(router)
3589 return errormsg
3590
3591 network = static_route["network"]
3592 if "no_of_ip" in static_route:
3593 no_of_ip = static_route["no_of_ip"]
3594 else:
3595 no_of_ip = 1
3596
3597 if "tag" in static_route:
3598 _tag = static_route["tag"]
3599 else:
3600 _tag = None
3601
3602 # Generating IPs for verification
3603 ip_list = generate_ips(network, no_of_ip)
3604 st_found = False
3605 nh_found = False
3606
3607 for st_rt in ip_list:
3608 st_rt = str(
3609 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
3610 )
3611 _addr_type = validate_ip_address(st_rt)
3612 if _addr_type != addr_type:
3613 continue
3614
3615 if st_rt in rib_routes_json:
3616 st_found = True
3617 found_routes.append(st_rt)
3618
3619 if "queued" in rib_routes_json[st_rt][0]:
3620 errormsg = "Route {} is queued\n".format(st_rt)
3621 return errormsg
3622
3623 if fib and next_hop:
3624 if type(next_hop) is not list:
3625 next_hop = [next_hop]
3626
3627 for mnh in range(0, len(rib_routes_json[st_rt])):
3628 if not "selected" in rib_routes_json[st_rt][mnh]:
3629 continue
3630
3631 if (
3632 "fib"
3633 in rib_routes_json[st_rt][mnh]["nexthops"][0]
3634 ):
3635 found_hops.append(
3636 [
3637 rib_r["ip"]
3638 for rib_r in rib_routes_json[st_rt][
3639 mnh
3640 ]["nexthops"]
3641 ]
3642 )
3643
3644 if found_hops[0]:
3645 missing_list_of_nexthops = set(
3646 found_hops[0]
3647 ).difference(next_hop)
3648 additional_nexthops_in_required_nhs = set(
3649 next_hop
3650 ).difference(found_hops[0])
3651
3652 if additional_nexthops_in_required_nhs:
3653 logger.info(
3654 "Nexthop "
3655 "%s is not active for route %s in "
3656 "RIB of router %s\n",
3657 additional_nexthops_in_required_nhs,
3658 st_rt,
3659 dut,
3660 )
3661 errormsg = (
3662 "Nexthop {} is not active"
3663 " for route {} in RIB of router"
3664 " {}\n".format(
3665 additional_nexthops_in_required_nhs,
3666 st_rt,
3667 dut,
3668 )
3669 )
3670 return errormsg
3671 else:
3672 nh_found = True
3673
3674 elif next_hop and fib is None:
3675 if type(next_hop) is not list:
3676 next_hop = [next_hop]
3677 found_hops = [
3678 rib_r["ip"]
3679 for rib_r in rib_routes_json[st_rt][0]["nexthops"]
3680 if "ip" in rib_r
3681 ]
3682
3683 # If somehow key "ip" is not found in nexthops JSON
3684 # then found_hops would be 0, this particular
3685 # situation will be handled here
3686 if not len(found_hops):
3687 errormsg = (
3688 "Nexthop {} is Missing for "
3689 "route {} in RIB of router {}\n".format(
3690 next_hop,
3691 st_rt,
3692 dut,
3693 )
3694 )
3695 return errormsg
3696
3697 # Check only the count of nexthops
3698 if count_only:
3699 if len(next_hop) == len(found_hops):
3700 nh_found = True
3701 else:
3702 errormsg = (
3703 "Nexthops are missing for "
3704 "route {} in RIB of router {}: "
3705 "expected {}, found {}\n".format(
3706 st_rt,
3707 dut,
3708 len(next_hop),
3709 len(found_hops),
3710 )
3711 )
3712 return errormsg
3713
3714 # Check the actual nexthops
3715 elif found_hops:
3716 missing_list_of_nexthops = set(
3717 found_hops
3718 ).difference(next_hop)
3719 additional_nexthops_in_required_nhs = set(
3720 next_hop
3721 ).difference(found_hops)
3722
3723 if additional_nexthops_in_required_nhs:
3724 logger.info(
3725 "Missing nexthop %s for route"
3726 " %s in RIB of router %s\n",
3727 additional_nexthops_in_required_nhs,
3728 st_rt,
3729 dut,
3730 )
3731 errormsg = (
3732 "Nexthop {} is Missing for "
3733 "route {} in RIB of router {}\n".format(
3734 additional_nexthops_in_required_nhs,
3735 st_rt,
3736 dut,
3737 )
3738 )
3739 return errormsg
3740 else:
3741 nh_found = True
3742
3743 if tag:
3744 if "tag" not in rib_routes_json[st_rt][0]:
3745 errormsg = (
3746 "[DUT: {}]: tag is not"
3747 " present for"
3748 " route {} in RIB \n".format(dut, st_rt)
3749 )
3750 return errormsg
3751
3752 if _tag != rib_routes_json[st_rt][0]["tag"]:
3753 errormsg = (
3754 "[DUT: {}]: tag value {}"
3755 " is not matched for"
3756 " route {} in RIB \n".format(
3757 dut,
3758 _tag,
3759 st_rt,
3760 )
3761 )
3762 return errormsg
3763
3764 if admin_distance is not None:
3765 if "distance" not in rib_routes_json[st_rt][0]:
3766 errormsg = (
3767 "[DUT: {}]: admin distance is"
3768 " not present for"
3769 " route {} in RIB \n".format(dut, st_rt)
3770 )
3771 return errormsg
3772
3773 if (
3774 admin_distance
3775 != rib_routes_json[st_rt][0]["distance"]
3776 ):
3777 errormsg = (
3778 "[DUT: {}]: admin distance value "
3779 "{} is not matched for "
3780 "route {} in RIB \n".format(
3781 dut,
3782 admin_distance,
3783 st_rt,
3784 )
3785 )
3786 return errormsg
3787
3788 if metric is not None:
3789 if "metric" not in rib_routes_json[st_rt][0]:
3790 errormsg = (
3791 "[DUT: {}]: metric is"
3792 " not present for"
3793 " route {} in RIB \n".format(dut, st_rt)
3794 )
3795 return errormsg
3796
3797 if metric != rib_routes_json[st_rt][0]["metric"]:
3798 errormsg = (
3799 "[DUT: {}]: metric value "
3800 "{} is not matched for "
3801 "route {} in RIB \n".format(
3802 dut,
3803 metric,
3804 st_rt,
3805 )
3806 )
3807 return errormsg
3808
3809 else:
3810 missing_routes.append(st_rt)
3811
3812 if nh_found:
3813 logger.info(
3814 "[DUT: {}]: Found next_hop {} for"
3815 " RIB routes: {}".format(router, next_hop, found_routes)
3816 )
3817
3818 if len(missing_routes) > 0:
3819 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
3820 dut, missing_routes
3821 )
3822 return errormsg
3823
3824 if found_routes:
3825 logger.info(
3826 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
3827 dut,
3828 found_routes,
3829 )
3830
3831 continue
3832
3833 if "bgp" in input_dict[routerInput]:
3834 if (
3835 "advertise_networks"
3836 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
3837 "unicast"
3838 ]
3839 ):
3840 continue
3841
3842 found_routes = []
3843 missing_routes = []
3844 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
3845 addr_type
3846 ]["unicast"]["advertise_networks"]
3847
3848 # Continue if there are no network advertise
3849 if len(advertise_network) == 0:
3850 continue
3851
3852 for advertise_network_dict in advertise_network:
3853 if "vrf" in advertise_network_dict:
3854 cmd = "{} vrf {} json".format(
3855 command, advertise_network_dict["vrf"]
3856 )
3857 else:
3858 cmd = "{} json".format(command)
3859
3860 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3861
3862 # Verifying output dictionary rib_routes_json is not empty
3863 if bool(rib_routes_json) is False:
3864 errormsg = "No route found in rib of router {}..".format(router)
3865 return errormsg
3866
3867 start_ip = advertise_network_dict["network"]
3868 if "no_of_network" in advertise_network_dict:
3869 no_of_network = advertise_network_dict["no_of_network"]
3870 else:
3871 no_of_network = 1
3872
3873 # Generating IPs for verification
3874 ip_list = generate_ips(start_ip, no_of_network)
3875 st_found = False
3876 nh_found = False
3877
3878 for st_rt in ip_list:
3879 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
3880
3881 _addr_type = validate_ip_address(st_rt)
3882 if _addr_type != addr_type:
3883 continue
3884
3885 if st_rt in rib_routes_json:
3886 st_found = True
3887 found_routes.append(st_rt)
3888
3889 if "queued" in rib_routes_json[st_rt][0]:
3890 errormsg = "Route {} is queued\n".format(st_rt)
3891 return errormsg
3892
3893 if next_hop:
3894 if type(next_hop) is not list:
3895 next_hop = [next_hop]
3896
3897 count = 0
3898 for nh in next_hop:
3899 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
3900 if nh_dict["ip"] != nh:
3901 continue
3902 else:
3903 count += 1
3904
3905 if count == len(next_hop):
3906 nh_found = True
3907 else:
3908 errormsg = (
3909 "Nexthop {} is Missing"
3910 " for route {} in "
3911 "RIB of router {}\n".format(next_hop, st_rt, dut)
3912 )
3913 return errormsg
3914 else:
3915 missing_routes.append(st_rt)
3916
3917 if nh_found:
3918 logger.info(
3919 "Found next_hop {} for all routes in RIB"
3920 " of router {}\n".format(next_hop, dut)
3921 )
3922
3923 if len(missing_routes) > 0:
3924 errormsg = (
3925 "Missing {} route in RIB of router {}, "
3926 "routes: {} \n".format(addr_type, dut, missing_routes)
3927 )
3928 return errormsg
3929
3930 if found_routes:
3931 logger.info(
3932 "Verified {} routes in router {} RIB, found"
3933 " routes are: {}\n".format(addr_type, dut, found_routes)
3934 )
3935
3936 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3937 return True
3938
3939
3940 @retry(retry_timeout=12)
3941 def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
3942 """
3943 Data will be read from input_dict or input JSON file, API will generate
3944 same prefixes, which were redistributed by either create_static_routes() or
3945 advertise_networks_using_network_command() and will verify next_hop and
3946 each prefix/routes is present in "show ip/ipv6 fib json"
3947 command o/p.
3948
3949 Parameters
3950 ----------
3951 * `tgen` : topogen object
3952 * `addr_type` : ip type, ipv4/ipv6
3953 * `dut`: Device Under Test, for which user wants to test the data
3954 * `input_dict` : input dict, has details of static routes
3955 * `next_hop`[optional]: next_hop which needs to be verified,
3956 default: static
3957
3958 Usage
3959 -----
3960 input_routes_r1 = {
3961 "r1": {
3962 "static_routes": [{
3963 "network": ["1.1.1.1/32],
3964 "next_hop": "Null0",
3965 "vrf": "RED"
3966 }]
3967 }
3968 }
3969 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
3970
3971 Returns
3972 -------
3973 errormsg(str) or True
3974 """
3975
3976 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3977
3978 router_list = tgen.routers()
3979 if dut not in router_list:
3980 return
3981
3982 for routerInput in input_dict.keys():
3983 # XXX replace with router = dut; rnode = router_list[dut]
3984 for router, rnode in router_list.items():
3985 if router != dut:
3986 continue
3987
3988 logger.info("Checking router %s FIB routes:", router)
3989
3990 # Verifying RIB routes
3991 if addr_type == "ipv4":
3992 command = "show ip fib"
3993 else:
3994 command = "show ipv6 fib"
3995
3996 found_routes = []
3997 missing_routes = []
3998
3999 if protocol:
4000 command = "{} {}".format(command, protocol)
4001
4002 if "static_routes" in input_dict[routerInput]:
4003 static_routes = input_dict[routerInput]["static_routes"]
4004
4005 for static_route in static_routes:
4006 if "vrf" in static_route and static_route["vrf"] is not None:
4007
4008 logger.info(
4009 "[DUT: {}]: Verifying routes for VRF:"
4010 " {}".format(router, static_route["vrf"])
4011 )
4012
4013 cmd = "{} vrf {}".format(command, static_route["vrf"])
4014
4015 else:
4016 cmd = "{}".format(command)
4017
4018 cmd = "{} json".format(cmd)
4019
4020 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
4021
4022 # Verifying output dictionary rib_routes_json is not empty
4023 if bool(rib_routes_json) is False:
4024 errormsg = "[DUT: {}]: No route found in fib".format(router)
4025 return errormsg
4026
4027 network = static_route["network"]
4028 if "no_of_ip" in static_route:
4029 no_of_ip = static_route["no_of_ip"]
4030 else:
4031 no_of_ip = 1
4032
4033 # Generating IPs for verification
4034 ip_list = generate_ips(network, no_of_ip)
4035 st_found = False
4036 nh_found = False
4037
4038 for st_rt in ip_list:
4039 st_rt = str(
4040 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
4041 )
4042 _addr_type = validate_ip_address(st_rt)
4043 if _addr_type != addr_type:
4044 continue
4045
4046 if st_rt in rib_routes_json:
4047 st_found = True
4048 found_routes.append(st_rt)
4049
4050 if next_hop:
4051 if type(next_hop) is not list:
4052 next_hop = [next_hop]
4053
4054 count = 0
4055 for nh in next_hop:
4056 for nh_dict in rib_routes_json[st_rt][0][
4057 "nexthops"
4058 ]:
4059 if nh_dict["ip"] != nh:
4060 continue
4061 else:
4062 count += 1
4063
4064 if count == len(next_hop):
4065 nh_found = True
4066 else:
4067 missing_routes.append(st_rt)
4068 errormsg = (
4069 "Nexthop {} is Missing"
4070 " for route {} in "
4071 "RIB of router {}\n".format(
4072 next_hop, st_rt, dut
4073 )
4074 )
4075 return errormsg
4076
4077 else:
4078 missing_routes.append(st_rt)
4079
4080 if len(missing_routes) > 0:
4081 errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
4082 dut, missing_routes
4083 )
4084 return errormsg
4085
4086 if nh_found:
4087 logger.info(
4088 "Found next_hop {} for all routes in RIB"
4089 " of router {}\n".format(next_hop, dut)
4090 )
4091
4092 if found_routes:
4093 logger.info(
4094 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
4095 dut,
4096 found_routes,
4097 )
4098
4099 continue
4100
4101 if "bgp" in input_dict[routerInput]:
4102 if (
4103 "advertise_networks"
4104 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
4105 "unicast"
4106 ]
4107 ):
4108 continue
4109
4110 found_routes = []
4111 missing_routes = []
4112 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
4113 addr_type
4114 ]["unicast"]["advertise_networks"]
4115
4116 # Continue if there are no network advertise
4117 if len(advertise_network) == 0:
4118 continue
4119
4120 for advertise_network_dict in advertise_network:
4121 if "vrf" in advertise_network_dict:
4122 cmd = "{} vrf {} json".format(command, static_route["vrf"])
4123 else:
4124 cmd = "{} json".format(command)
4125
4126 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
4127
4128 # Verifying output dictionary rib_routes_json is not empty
4129 if bool(rib_routes_json) is False:
4130 errormsg = "No route found in rib of router {}..".format(router)
4131 return errormsg
4132
4133 start_ip = advertise_network_dict["network"]
4134 if "no_of_network" in advertise_network_dict:
4135 no_of_network = advertise_network_dict["no_of_network"]
4136 else:
4137 no_of_network = 1
4138
4139 # Generating IPs for verification
4140 ip_list = generate_ips(start_ip, no_of_network)
4141 st_found = False
4142 nh_found = False
4143
4144 for st_rt in ip_list:
4145 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
4146
4147 _addr_type = validate_ip_address(st_rt)
4148 if _addr_type != addr_type:
4149 continue
4150
4151 if st_rt in rib_routes_json:
4152 st_found = True
4153 found_routes.append(st_rt)
4154
4155 if next_hop:
4156 if type(next_hop) is not list:
4157 next_hop = [next_hop]
4158
4159 count = 0
4160 for nh in next_hop:
4161 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
4162 if nh_dict["ip"] != nh:
4163 continue
4164 else:
4165 count += 1
4166
4167 if count == len(next_hop):
4168 nh_found = True
4169 else:
4170 missing_routes.append(st_rt)
4171 errormsg = (
4172 "Nexthop {} is Missing"
4173 " for route {} in "
4174 "RIB of router {}\n".format(next_hop, st_rt, dut)
4175 )
4176 return errormsg
4177 else:
4178 missing_routes.append(st_rt)
4179
4180 if len(missing_routes) > 0:
4181 errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
4182 dut, missing_routes
4183 )
4184 return errormsg
4185
4186 if nh_found:
4187 logger.info(
4188 "Found next_hop {} for all routes in RIB"
4189 " of router {}\n".format(next_hop, dut)
4190 )
4191
4192 if found_routes:
4193 logger.info(
4194 "[DUT: {}]: Verified routes FIB"
4195 ", found routes are: {}\n".format(dut, found_routes)
4196 )
4197
4198 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4199 return True
4200
4201
4202 def verify_admin_distance_for_static_routes(tgen, input_dict):
4203 """
4204 API to verify admin distance for static routes as defined in input_dict/
4205 input JSON by running show ip/ipv6 route json command.
4206 Parameter
4207 ---------
4208 * `tgen` : topogen object
4209 * `input_dict`: having details like - for which router and static routes
4210 admin dsitance needs to be verified
4211 Usage
4212 -----
4213 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
4214 10.0.0.2 in router r1
4215 input_dict = {
4216 "r1": {
4217 "static_routes": [{
4218 "network": "10.0.20.1/32",
4219 "admin_distance": 10,
4220 "next_hop": "10.0.0.2"
4221 }]
4222 }
4223 }
4224 result = verify_admin_distance_for_static_routes(tgen, input_dict)
4225 Returns
4226 -------
4227 errormsg(str) or True
4228 """
4229
4230 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4231
4232 router_list = tgen.routers()
4233 for router in input_dict.keys():
4234 if router not in router_list:
4235 continue
4236 rnode = router_list[router]
4237
4238 for static_route in input_dict[router]["static_routes"]:
4239 addr_type = validate_ip_address(static_route["network"])
4240 # Command to execute
4241 if addr_type == "ipv4":
4242 command = "show ip route json"
4243 else:
4244 command = "show ipv6 route json"
4245 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
4246
4247 logger.info(
4248 "Verifying admin distance for static route %s" " under dut %s:",
4249 static_route,
4250 router,
4251 )
4252 network = static_route["network"]
4253 next_hop = static_route["next_hop"]
4254 admin_distance = static_route["admin_distance"]
4255 route_data = show_ip_route_json[network][0]
4256 if network in show_ip_route_json:
4257 if route_data["nexthops"][0]["ip"] == next_hop:
4258 if route_data["distance"] != admin_distance:
4259 errormsg = (
4260 "Verification failed: admin distance"
4261 " for static route {} under dut {},"
4262 " found:{} but expected:{}".format(
4263 static_route,
4264 router,
4265 route_data["distance"],
4266 admin_distance,
4267 )
4268 )
4269 return errormsg
4270 else:
4271 logger.info(
4272 "Verification successful: admin"
4273 " distance for static route %s under"
4274 " dut %s, found:%s",
4275 static_route,
4276 router,
4277 route_data["distance"],
4278 )
4279
4280 else:
4281 errormsg = (
4282 "Static route {} not found in "
4283 "show_ip_route_json for dut {}".format(network, router)
4284 )
4285 return errormsg
4286
4287 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4288 return True
4289
4290
4291 def verify_prefix_lists(tgen, input_dict):
4292 """
4293 Running "show ip prefix-list" command and verifying given prefix-list
4294 is present in router.
4295 Parameters
4296 ----------
4297 * `tgen` : topogen object
4298 * `input_dict`: data to verify prefix lists
4299 Usage
4300 -----
4301 # To verify pf_list_1 is present in router r1
4302 input_dict = {
4303 "r1": {
4304 "prefix_lists": ["pf_list_1"]
4305 }}
4306 result = verify_prefix_lists("ipv4", input_dict, tgen)
4307 Returns
4308 -------
4309 errormsg(str) or True
4310 """
4311
4312 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4313
4314 router_list = tgen.routers()
4315 for router in input_dict.keys():
4316 if router not in router_list:
4317 continue
4318
4319 rnode = router_list[router]
4320
4321 # Show ip prefix list
4322 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
4323
4324 # Verify Prefix list is deleted
4325 prefix_lists_addr = input_dict[router]["prefix_lists"]
4326 for addr_type in prefix_lists_addr:
4327 if not check_address_types(addr_type):
4328 continue
4329 # show ip prefix list
4330 if addr_type == "ipv4":
4331 cmd = "show ip prefix-list"
4332 else:
4333 cmd = "show {} prefix-list".format(addr_type)
4334 show_prefix_list = run_frr_cmd(rnode, cmd)
4335 for prefix_list in prefix_lists_addr[addr_type].keys():
4336 if prefix_list in show_prefix_list:
4337 errormsg = (
4338 "Prefix list {} is/are present in the router"
4339 " {}".format(prefix_list, router)
4340 )
4341 return errormsg
4342
4343 logger.info(
4344 "Prefix list %s is/are not present in the router" " from router %s",
4345 prefix_list,
4346 router,
4347 )
4348
4349 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4350 return True
4351
4352
4353 @retry(retry_timeout=12)
4354 def verify_route_maps(tgen, input_dict):
4355 """
4356 Running "show route-map" command and verifying given route-map
4357 is present in router.
4358 Parameters
4359 ----------
4360 * `tgen` : topogen object
4361 * `input_dict`: data to verify prefix lists
4362 Usage
4363 -----
4364 # To verify rmap_1 and rmap_2 are present in router r1
4365 input_dict = {
4366 "r1": {
4367 "route_maps": ["rmap_1", "rmap_2"]
4368 }
4369 }
4370 result = verify_route_maps(tgen, input_dict)
4371 Returns
4372 -------
4373 errormsg(str) or True
4374 """
4375
4376 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4377
4378 router_list = tgen.routers()
4379 for router in input_dict.keys():
4380 if router not in router_list:
4381 continue
4382
4383 rnode = router_list[router]
4384 # Show ip route-map
4385 show_route_maps = rnode.vtysh_cmd("show route-map")
4386
4387 # Verify route-map is deleted
4388 route_maps = input_dict[router]["route_maps"]
4389 for route_map in route_maps:
4390 if route_map in show_route_maps:
4391 errormsg = "Route map {} is not deleted from router" " {}".format(
4392 route_map, router
4393 )
4394 return errormsg
4395
4396 logger.info(
4397 "Route map %s is/are deleted successfully from" " router %s",
4398 route_maps,
4399 router,
4400 )
4401
4402 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4403 return True
4404
4405
4406 @retry(retry_timeout=16)
4407 def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
4408 """
4409 API to veiryf BGP large community is attached in route for any given
4410 DUT by running "show bgp ipv4/6 {route address} json" command.
4411 Parameters
4412 ----------
4413 * `tgen`: topogen object
4414 * `addr_type` : ip type, ipv4/ipv6
4415 * `dut`: Device Under Test
4416 * `network`: network for which set criteria needs to be verified
4417 * `input_dict`: having details like - for which router, community and
4418 values needs to be verified
4419 Usage
4420 -----
4421 networks = ["200.50.2.0/32"]
4422 input_dict = {
4423 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
4424 }
4425 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
4426 Returns
4427 -------
4428 errormsg(str) or True
4429 """
4430
4431 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4432 router_list = tgen.routers()
4433 if router not in router_list:
4434 return False
4435
4436 rnode = router_list[router]
4437
4438 logger.debug(
4439 "Verifying BGP community attributes on dut %s: for %s " "network %s",
4440 router,
4441 addr_type,
4442 network,
4443 )
4444
4445 for net in network:
4446 cmd = "show bgp {} {} json".format(addr_type, net)
4447 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
4448 logger.info(show_bgp_json)
4449 if "paths" not in show_bgp_json:
4450 return "Prefix {} not found in BGP table of router: {}".format(net, router)
4451
4452 as_paths = show_bgp_json["paths"]
4453 found = False
4454 for i in range(len(as_paths)):
4455 if (
4456 "largeCommunity" in show_bgp_json["paths"][i]
4457 or "community" in show_bgp_json["paths"][i]
4458 ):
4459 found = True
4460 logger.info(
4461 "Large Community attribute is found for route:" " %s in router: %s",
4462 net,
4463 router,
4464 )
4465 if input_dict is not None:
4466 for criteria, comm_val in input_dict.items():
4467 show_val = show_bgp_json["paths"][i][criteria]["string"]
4468 if comm_val == show_val:
4469 logger.info(
4470 "Verifying BGP %s for prefix: %s"
4471 " in router: %s, found expected"
4472 " value: %s",
4473 criteria,
4474 net,
4475 router,
4476 comm_val,
4477 )
4478 else:
4479 errormsg = (
4480 "Failed: Verifying BGP attribute"
4481 " {} for route: {} in router: {}"
4482 ", expected value: {} but found"
4483 ": {}".format(criteria, net, router, comm_val, show_val)
4484 )
4485 return errormsg
4486
4487 if not found:
4488 errormsg = (
4489 "Large Community attribute is not found for route: "
4490 "{} in router: {} ".format(net, router)
4491 )
4492 return errormsg
4493
4494 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4495 return True
4496
4497
4498 def get_ipv6_linklocal_address(topo, node, intf):
4499 """
4500 API to get the link local ipv6 address of a particular interface
4501
4502 Parameters
4503 ----------
4504 * `node`: node on which link local ip to be fetched.
4505 * `intf` : interface for which link local ip needs to be returned.
4506 * `topo` : base topo
4507
4508 Usage
4509 -----
4510 result = get_ipv6_linklocal_address(topo, 'r1', 'r2')
4511
4512 Returns link local ip of interface between r1 and r2.
4513
4514 Returns
4515 -------
4516 1) link local ipv6 address from the interface
4517 2) errormsg - when link local ip not found
4518 """
4519 tgen = get_topogen()
4520 ext_nh = tgen.net[node].get_ipv6_linklocal()
4521 req_nh = topo[node]["links"][intf]["interface"]
4522 llip = None
4523 for llips in ext_nh:
4524 if llips[0] == req_nh:
4525 llip = llips[1]
4526 logger.info("Link local ip found = %s", llip)
4527 return llip
4528
4529 errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format(
4530 node, intf
4531 )
4532
4533 return errormsg
4534
4535
4536 def verify_create_community_list(tgen, input_dict):
4537 """
4538 API is to verify if large community list is created for any given DUT in
4539 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
4540 command.
4541 Parameters
4542 ----------
4543 * `tgen`: topogen object
4544 * `input_dict`: having details like - for which router, large community
4545 needs to be verified
4546 Usage
4547 -----
4548 input_dict = {
4549 "r1": {
4550 "large-community-list": {
4551 "standard": {
4552 "Test1": [{"action": "PERMIT", "attribute":\
4553 ""}]
4554 }}}}
4555 result = verify_create_community_list(tgen, input_dict)
4556 Returns
4557 -------
4558 errormsg(str) or True
4559 """
4560
4561 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4562
4563 router_list = tgen.routers()
4564 for router in input_dict.keys():
4565 if router not in router_list:
4566 continue
4567
4568 rnode = router_list[router]
4569
4570 logger.info("Verifying large-community is created for dut %s:", router)
4571
4572 for comm_data in input_dict[router]["bgp_community_lists"]:
4573 comm_name = comm_data["name"]
4574 comm_type = comm_data["community_type"]
4575 show_bgp_community = run_frr_cmd(
4576 rnode, "show bgp large-community-list {} detail".format(comm_name)
4577 )
4578
4579 # Verify community list and type
4580 if comm_name in show_bgp_community and comm_type in show_bgp_community:
4581 logger.info(
4582 "BGP %s large-community-list %s is" " created", comm_type, comm_name
4583 )
4584 else:
4585 errormsg = "BGP {} large-community-list {} is not" " created".format(
4586 comm_type, comm_name
4587 )
4588 return errormsg
4589
4590 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4591 return True
4592
4593
4594 def verify_cli_json(tgen, input_dict):
4595 """
4596 API to verify if JSON is available for clis
4597 command.
4598 Parameters
4599 ----------
4600 * `tgen`: topogen object
4601 * `input_dict`: CLIs for which JSON needs to be verified
4602 Usage
4603 -----
4604 input_dict = {
4605 "edge1":{
4606 "cli": ["show evpn vni detail", show evpn rmac vni all]
4607 }
4608 }
4609
4610 result = verify_cli_json(tgen, input_dict)
4611
4612 Returns
4613 -------
4614 errormsg(str) or True
4615 """
4616
4617 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4618 for dut in input_dict.keys():
4619 rnode = tgen.gears[dut]
4620
4621 for cli in input_dict[dut]["cli"]:
4622 logger.info(
4623 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli
4624 )
4625
4626 test_cli = "{} json".format(cli)
4627 ret_json = rnode.vtysh_cmd(test_cli, isjson=True)
4628 if not bool(ret_json):
4629 errormsg = "CLI: %s, JSON format is not available" % (cli)
4630 return errormsg
4631 elif "unknown" in ret_json or "Unknown" in ret_json:
4632 errormsg = "CLI: %s, JSON format is not available" % (cli)
4633 return errormsg
4634 else:
4635 logger.info(
4636 "CLI : %s JSON format is available: " "\n %s", cli, ret_json
4637 )
4638
4639 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4640
4641 return True
4642
4643
4644 @retry(retry_timeout=12)
4645 def verify_evpn_vni(tgen, input_dict):
4646 """
4647 API to verify evpn vni details using "show evpn vni detail json"
4648 command.
4649
4650 Parameters
4651 ----------
4652 * `tgen`: topogen object
4653 * `input_dict`: having details like - for which router, evpn details
4654 needs to be verified
4655 Usage
4656 -----
4657 input_dict = {
4658 "edge1":{
4659 "vni": [
4660 {
4661 "75100":{
4662 "vrf": "RED",
4663 "vxlanIntf": "vxlan75100",
4664 "localVtepIp": "120.1.1.1",
4665 "sviIntf": "br100"
4666 }
4667 }
4668 ]
4669 }
4670 }
4671
4672 result = verify_evpn_vni(tgen, input_dict)
4673
4674 Returns
4675 -------
4676 errormsg(str) or True
4677 """
4678
4679 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4680 for dut in input_dict.keys():
4681 rnode = tgen.gears[dut]
4682
4683 logger.info("[DUT: %s]: Verifying evpn vni details :", dut)
4684
4685 cmd = "show evpn vni detail json"
4686 evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4687 if not bool(evpn_all_vni_json):
4688 errormsg = "No output for '{}' cli".format(cmd)
4689 return errormsg
4690
4691 if "vni" in input_dict[dut]:
4692 for vni_dict in input_dict[dut]["vni"]:
4693 found = False
4694 vni = vni_dict["name"]
4695 for evpn_vni_json in evpn_all_vni_json:
4696 if "vni" in evpn_vni_json:
4697 if evpn_vni_json["vni"] != int(vni):
4698 continue
4699
4700 for attribute in vni_dict.keys():
4701 if vni_dict[attribute] != evpn_vni_json[attribute]:
4702 errormsg = (
4703 "[DUT: %s] Verifying "
4704 "%s for VNI: %s [FAILED]||"
4705 ", EXPECTED : %s "
4706 " FOUND : %s"
4707 % (
4708 dut,
4709 attribute,
4710 vni,
4711 vni_dict[attribute],
4712 evpn_vni_json[attribute],
4713 )
4714 )
4715 return errormsg
4716
4717 else:
4718 found = True
4719 logger.info(
4720 "[DUT: %s] Verifying"
4721 " %s for VNI: %s , "
4722 "Found Expected : %s ",
4723 dut,
4724 attribute,
4725 vni,
4726 evpn_vni_json[attribute],
4727 )
4728
4729 if evpn_vni_json["state"] != "Up":
4730 errormsg = (
4731 "[DUT: %s] Failed: Verifying"
4732 " State for VNI: %s is not Up" % (dut, vni)
4733 )
4734 return errormsg
4735
4736 else:
4737 errormsg = (
4738 "[DUT: %s] Failed:"
4739 " VNI: %s is not present in JSON" % (dut, vni)
4740 )
4741 return errormsg
4742
4743 if found:
4744 logger.info(
4745 "[DUT %s]: Verifying VNI : %s "
4746 "details and state is Up [PASSED]!!",
4747 dut,
4748 vni,
4749 )
4750 return True
4751
4752 else:
4753 errormsg = (
4754 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut)
4755 )
4756 return errormsg
4757
4758 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4759 return False
4760
4761
4762 @retry(retry_timeout=12)
4763 def verify_vrf_vni(tgen, input_dict):
4764 """
4765 API to verify vrf vni details using "show vrf vni json"
4766 command.
4767 Parameters
4768 ----------
4769 * `tgen`: topogen object
4770 * `input_dict`: having details like - for which router, evpn details
4771 needs to be verified
4772 Usage
4773 -----
4774 input_dict = {
4775 "edge1":{
4776 "vrfs": [
4777 {
4778 "RED":{
4779 "vni": 75000,
4780 "vxlanIntf": "vxlan75100",
4781 "sviIntf": "br100",
4782 "routerMac": "00:80:48:ba:d1:00",
4783 "state": "Up"
4784 }
4785 }
4786 ]
4787 }
4788 }
4789
4790 result = verify_vrf_vni(tgen, input_dict)
4791
4792 Returns
4793 -------
4794 errormsg(str) or True
4795 """
4796
4797 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4798 for dut in input_dict.keys():
4799 rnode = tgen.gears[dut]
4800
4801 logger.info("[DUT: %s]: Verifying vrf vni details :", dut)
4802
4803 cmd = "show vrf vni json"
4804 vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4805 if not bool(vrf_all_vni_json):
4806 errormsg = "No output for '{}' cli".format(cmd)
4807 return errormsg
4808
4809 if "vrfs" in input_dict[dut]:
4810 for vrfs in input_dict[dut]["vrfs"]:
4811 for vrf, vrf_dict in vrfs.items():
4812 found = False
4813 for vrf_vni_json in vrf_all_vni_json["vrfs"]:
4814 if "vrf" in vrf_vni_json:
4815 if vrf_vni_json["vrf"] != vrf:
4816 continue
4817
4818 for attribute in vrf_dict.keys():
4819 if vrf_dict[attribute] == vrf_vni_json[attribute]:
4820 found = True
4821 logger.info(
4822 "[DUT %s]: VRF: %s, "
4823 "verifying %s "
4824 ", Found Expected: %s "
4825 "[PASSED]!!",
4826 dut,
4827 vrf,
4828 attribute,
4829 vrf_vni_json[attribute],
4830 )
4831 else:
4832 errormsg = (
4833 "[DUT: %s] VRF: %s, "
4834 "verifying %s [FAILED!!] "
4835 ", EXPECTED : %s "
4836 ", FOUND : %s"
4837 % (
4838 dut,
4839 vrf,
4840 attribute,
4841 vrf_dict[attribute],
4842 vrf_vni_json[attribute],
4843 )
4844 )
4845 return errormsg
4846
4847 else:
4848 errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % (
4849 dut,
4850 vrf,
4851 )
4852 return errormsg
4853
4854 if found:
4855 logger.info(
4856 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
4857 dut,
4858 vrf,
4859 )
4860 return True
4861
4862 else:
4863 errormsg = (
4864 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut)
4865 )
4866 return errormsg
4867
4868 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4869 return False
4870
4871
4872 def required_linux_kernel_version(required_version):
4873 """
4874 This API is used to check linux version compatibility of the test suite.
4875 If version mentioned in required_version is higher than the linux kernel
4876 of the system, test suite will be skipped. This API returns true or errormsg.
4877
4878 Parameters
4879 ----------
4880 * `required_version` : Kernel version required for the suites to run.
4881
4882 Usage
4883 -----
4884 result = linux_kernel_version_lowerthan('4.15')
4885
4886 Returns
4887 -------
4888 errormsg(str) or True
4889 """
4890 system_kernel = platform.release()
4891 if version_cmp(system_kernel, required_version) < 0:
4892 error_msg = (
4893 'These tests will not run on kernel "{}", '
4894 "they require kernel >= {})".format(system_kernel, required_version)
4895 )
4896
4897 logger.info(error_msg)
4898
4899 return error_msg
4900 return True
4901
4902
4903 class HostApplicationHelper(object):
4904 """Helper to track and cleanup per-host based test processes."""
4905
4906 def __init__(self, tgen=None, base_cmd=None):
4907 self.base_cmd_str = ""
4908 self.host_procs = {}
4909 self.tgen = None
4910 self.set_base_cmd(base_cmd if base_cmd else [])
4911 if tgen is not None:
4912 self.init(tgen)
4913
4914 def __enter__(self):
4915 self.init()
4916 return self
4917
4918 def __exit__(self, type, value, traceback):
4919 self.cleanup()
4920
4921 def __str__(self):
4922 return "HostApplicationHelper({})".format(self.base_cmd_str)
4923
4924 def set_base_cmd(self, base_cmd):
4925 assert isinstance(base_cmd, list) or isinstance(base_cmd, tuple)
4926 self.base_cmd = base_cmd
4927 if base_cmd:
4928 self.base_cmd_str = " ".join(base_cmd)
4929 else:
4930 self.base_cmd_str = ""
4931
4932 def init(self, tgen=None):
4933 """Initialize the helper with tgen if needed.
4934
4935 If overridden, need to handle multiple entries but one init. Will be called on
4936 object creation if tgen is supplied. Will be called again on __enter__ so should
4937 not re-init if already inited.
4938 """
4939 if self.tgen:
4940 assert tgen is None or self.tgen == tgen
4941 else:
4942 self.tgen = tgen
4943
4944 def started_proc(self, host, p):
4945 """Called after process started on host.
4946
4947 Return value is passed to `stopping_proc` method."""
4948 logger.debug("%s: Doing nothing after starting process", self)
4949 return False
4950
4951 def stopping_proc(self, host, p, info):
4952 """Called after process started on host."""
4953 logger.debug("%s: Doing nothing before stopping process", self)
4954
4955 def _add_host_proc(self, host, p):
4956 v = self.started_proc(host, p)
4957
4958 if host not in self.host_procs:
4959 self.host_procs[host] = []
4960 logger.debug("%s: %s: tracking process %s", self, host, p)
4961 self.host_procs[host].append((p, v))
4962
4963 def stop_host(self, host):
4964 """Stop the process on the host.
4965
4966 Override to do additional cleanup."""
4967 if host in self.host_procs:
4968 hlogger = self.tgen.net[host].logger
4969 for p, v in self.host_procs[host]:
4970 self.stopping_proc(host, p, v)
4971 logger.debug("%s: %s: terminating process %s", self, host, p.pid)
4972 hlogger.debug("%s: %s: terminating process %s", self, host, p.pid)
4973 rc = p.poll()
4974 if rc is not None:
4975 logger.error(
4976 "%s: %s: process early exit %s: %s",
4977 self,
4978 host,
4979 p.pid,
4980 comm_error(p),
4981 )
4982 hlogger.error(
4983 "%s: %s: process early exit %s: %s",
4984 self,
4985 host,
4986 p.pid,
4987 comm_error(p),
4988 )
4989 else:
4990 p.terminate()
4991 p.wait()
4992 logger.debug(
4993 "%s: %s: terminated process %s: %s",
4994 self,
4995 host,
4996 p.pid,
4997 comm_error(p),
4998 )
4999 hlogger.debug(
5000 "%s: %s: terminated process %s: %s",
5001 self,
5002 host,
5003 p.pid,
5004 comm_error(p),
5005 )
5006
5007 del self.host_procs[host]
5008
5009 def stop_all_hosts(self):
5010 hosts = set(self.host_procs)
5011 for host in hosts:
5012 self.stop_host(host)
5013
5014 def cleanup(self):
5015 self.stop_all_hosts()
5016
5017 def run(self, host, cmd_args, **kwargs):
5018 cmd = list(self.base_cmd)
5019 cmd.extend(cmd_args)
5020 p = self.tgen.gears[host].popen(cmd, **kwargs)
5021 assert p.poll() is None
5022 self._add_host_proc(host, p)
5023 return p
5024
5025 def check_procs(self):
5026 """Check that all current processes are running, log errors if not.
5027
5028 Returns: List of stopped processes."""
5029 procs = []
5030
5031 logger.debug("%s: checking procs on hosts %s", self, self.host_procs.keys())
5032
5033 for host in self.host_procs:
5034 hlogger = self.tgen.net[host].logger
5035 for p, _ in self.host_procs[host]:
5036 logger.debug("%s: checking %s proc %s", self, host, p)
5037 rc = p.poll()
5038 if rc is None:
5039 continue
5040 logger.error(
5041 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
5042 )
5043 hlogger.error(
5044 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
5045 )
5046 procs.append(p)
5047 return procs
5048
5049
5050 class IPerfHelper(HostApplicationHelper):
5051 def __str__(self):
5052 return "IPerfHelper()"
5053
5054 def run_join(
5055 self,
5056 host,
5057 join_addr,
5058 l4Type="UDP",
5059 join_interval=1,
5060 join_intf=None,
5061 join_towards=None,
5062 ):
5063 """
5064 Use iperf to send IGMP join and listen to traffic
5065
5066 Parameters:
5067 -----------
5068 * `host`: iperf host from where IGMP join would be sent
5069 * `l4Type`: string, one of [ TCP, UDP ]
5070 * `join_addr`: multicast address (or addresses) to join to
5071 * `join_interval`: seconds between periodic bandwidth reports
5072 * `join_intf`: the interface to bind the join to
5073 * `join_towards`: router whos interface to bind the join to
5074
5075 returns: Success (bool)
5076 """
5077
5078 iperf_path = self.tgen.net.get_exec_path("iperf")
5079
5080 assert join_addr
5081 if not isinstance(join_addr, list) and not isinstance(join_addr, tuple):
5082 join_addr = [ipaddress.IPv4Address(frr_unicode(join_addr))]
5083
5084 for bindTo in join_addr:
5085 iperf_args = [iperf_path, "-s"]
5086
5087 if l4Type == "UDP":
5088 iperf_args.append("-u")
5089
5090 iperf_args.append("-B")
5091 if join_towards:
5092 to_intf = frr_unicode(
5093 self.tgen.json_topo["routers"][host]["links"][join_towards][
5094 "interface"
5095 ]
5096 )
5097 iperf_args.append("{}%{}".format(str(bindTo), to_intf))
5098 elif join_intf:
5099 iperf_args.append("{}%{}".format(str(bindTo), join_intf))
5100 else:
5101 iperf_args.append(str(bindTo))
5102
5103 if join_interval:
5104 iperf_args.append("-i")
5105 iperf_args.append(str(join_interval))
5106
5107 p = self.run(host, iperf_args)
5108 if p.poll() is not None:
5109 logger.error("IGMP join failed on %s: %s", bindTo, comm_error(p))
5110 return False
5111 return True
5112
5113 def run_traffic(
5114 self, host, sentToAddress, ttl, time=0, l4Type="UDP", bind_towards=None
5115 ):
5116 """
5117 Run iperf to send IGMP join and traffic
5118
5119 Parameters:
5120 -----------
5121 * `host`: iperf host to send traffic from
5122 * `l4Type`: string, one of [ TCP, UDP ]
5123 * `sentToAddress`: multicast address to send traffic to
5124 * `ttl`: time to live
5125 * `time`: time in seconds to transmit for
5126 * `bind_towards`: Router who's interface the source ip address is got from
5127
5128 returns: Success (bool)
5129 """
5130
5131 iperf_path = self.tgen.net.get_exec_path("iperf")
5132
5133 if sentToAddress and not isinstance(sentToAddress, list):
5134 sentToAddress = [ipaddress.IPv4Address(frr_unicode(sentToAddress))]
5135
5136 for sendTo in sentToAddress:
5137 iperf_args = [iperf_path, "-c", sendTo]
5138
5139 # Bind to Interface IP
5140 if bind_towards:
5141 ifaddr = frr_unicode(
5142 self.tgen.json_topo["routers"][host]["links"][bind_towards]["ipv4"]
5143 )
5144 ipaddr = ipaddress.IPv4Interface(ifaddr).ip
5145 iperf_args.append("-B")
5146 iperf_args.append(str(ipaddr))
5147
5148 # UDP/TCP
5149 if l4Type == "UDP":
5150 iperf_args.append("-u")
5151 iperf_args.append("-b")
5152 iperf_args.append("0.012m")
5153
5154 # TTL
5155 if ttl:
5156 iperf_args.append("-T")
5157 iperf_args.append(str(ttl))
5158
5159 # Time
5160 if time:
5161 iperf_args.append("-t")
5162 iperf_args.append(str(time))
5163
5164 p = self.run(host, iperf_args)
5165 if p.poll() is not None:
5166 logger.error(
5167 "mcast traffic send failed for %s: %s", sendTo, comm_error(p)
5168 )
5169 return False
5170
5171 return True
5172
5173
5174 def verify_ip_nht(tgen, input_dict):
5175 """
5176 Running "show ip nht" command and verifying given nexthop resolution
5177 Parameters
5178 ----------
5179 * `tgen` : topogen object
5180 * `input_dict`: data to verify nexthop
5181 Usage
5182 -----
5183 input_dict_4 = {
5184 "r1": {
5185 nh: {
5186 "Address": nh,
5187 "resolvedVia": "connected",
5188 "nexthops": {
5189 "nexthop1": {
5190 "Interface": intf
5191 }
5192 }
5193 }
5194 }
5195 }
5196 result = verify_ip_nht(tgen, input_dict_4)
5197 Returns
5198 -------
5199 errormsg(str) or True
5200 """
5201
5202 logger.debug("Entering lib API: verify_ip_nht()")
5203
5204 router_list = tgen.routers()
5205 for router in input_dict.keys():
5206 if router not in router_list:
5207 continue
5208
5209 rnode = router_list[router]
5210 nh_list = input_dict[router]
5211
5212 if validate_ip_address(next(iter(nh_list))) == "ipv6":
5213 show_ip_nht = run_frr_cmd(rnode, "show ipv6 nht")
5214 else:
5215 show_ip_nht = run_frr_cmd(rnode, "show ip nht")
5216
5217 for nh in nh_list:
5218 if nh in show_ip_nht:
5219 nht = run_frr_cmd(rnode, "show ip nht {}".format(nh))
5220 if "unresolved" in nht:
5221 errormsg = "Nexthop {} became unresolved on {}".format(nh, router)
5222 return errormsg
5223 else:
5224 logger.info("Nexthop %s is resolved on %s", nh, router)
5225 return True
5226 else:
5227 errormsg = "Nexthop {} is resolved on {}".format(nh, router)
5228 return errormsg
5229
5230 logger.debug("Exiting lib API: verify_ip_nht()")
5231 return False
5232
5233
5234 def scapy_send_raw_packet(tgen, topo, senderRouter, intf, packet=None):
5235 """
5236 Using scapy Raw() method to send BSR raw packet from one FRR
5237 to other
5238
5239 Parameters:
5240 -----------
5241 * `tgen` : Topogen object
5242 * `topo` : json file data
5243 * `senderRouter` : Sender router
5244 * `packet` : packet in raw format
5245
5246 returns:
5247 --------
5248 errormsg or True
5249 """
5250
5251 global CD
5252 result = ""
5253 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5254 sender_interface = intf
5255 rnode = tgen.routers()[senderRouter]
5256
5257 for destLink, data in topo["routers"][senderRouter]["links"].items():
5258 if "type" in data and data["type"] == "loopback":
5259 continue
5260
5261 if not packet:
5262 packet = topo["routers"][senderRouter]["pkt"]["test_packets"][packet][
5263 "data"
5264 ]
5265
5266 python3_path = tgen.net.get_exec_path(["python3", "python"])
5267 script_path = os.path.join(CD, "send_bsr_packet.py")
5268 cmd = "{} {} '{}' '{}' --interval=1 --count=1".format(
5269 python3_path, script_path, packet, sender_interface
5270 )
5271
5272 logger.info("Scapy cmd: \n %s", cmd)
5273 result = rnode.run(cmd)
5274
5275 if result == "":
5276 return result
5277
5278 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
5279 return True