]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/common_config.py
zebra: Unlock the route node when sending route notifications
[mirror_frr.git] / tests / topotests / lib / common_config.py
1 # SPDX-License-Identifier: ISC
2 #
3 # Copyright (c) 2019 by VMware, Inc. ("VMware")
4 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
5 # ("NetDEF") in this file.
6 #
7
8 import ipaddress
9 import json
10 import os
11 import platform
12 import socket
13 import subprocess
14 import sys
15 import traceback
16 import functools
17 from collections import OrderedDict
18 from copy import deepcopy
19 from datetime import datetime, timedelta
20 from functools import wraps
21 from re import search as re_search
22 from time import sleep
23
24 try:
25 # Imports from python2
26 import ConfigParser as configparser
27 except ImportError:
28 # Imports from python3
29 import configparser
30
31 from lib.micronet import comm_error
32 from lib.topogen import TopoRouter, get_topogen
33 from lib.topolog import get_logger, logger
34 from lib.topotest import frr_unicode, interface_set_status, version_cmp
35 from lib import topotest
36
37 FRRCFG_FILE = "frr_json.conf"
38 FRRCFG_BKUP_FILE = "frr_json_initial.conf"
39
40 ERROR_LIST = ["Malformed", "Failure", "Unknown", "Incomplete"]
41
42 ####
43 CD = os.path.dirname(os.path.realpath(__file__))
44 PYTESTINI_PATH = os.path.join(CD, "../pytest.ini")
45
46 # NOTE: to save execution logs to log file frrtest_log_dir must be configured
47 # in `pytest.ini`.
48 config = configparser.ConfigParser()
49 config.read(PYTESTINI_PATH)
50
51 config_section = "topogen"
52
53 # Debug logs for daemons
54 DEBUG_LOGS = {
55 "pimd": [
56 "debug msdp events",
57 "debug msdp packets",
58 "debug igmp events",
59 "debug igmp trace",
60 "debug mroute",
61 "debug mroute detail",
62 "debug pim events",
63 "debug pim packets",
64 "debug pim trace",
65 "debug pim zebra",
66 "debug pim bsm",
67 "debug pim packets joins",
68 "debug pim packets register",
69 "debug pim nht",
70 ],
71 "pim6d": [
72 "debug pimv6 events",
73 "debug pimv6 packets",
74 "debug pimv6 packet-dump send",
75 "debug pimv6 packet-dump receive",
76 "debug pimv6 trace",
77 "debug pimv6 trace detail",
78 "debug pimv6 zebra",
79 "debug pimv6 bsm",
80 "debug pimv6 packets hello",
81 "debug pimv6 packets joins",
82 "debug pimv6 packets register",
83 "debug pimv6 nht",
84 "debug pimv6 nht detail",
85 "debug mroute6",
86 "debug mroute6 detail",
87 "debug mld events",
88 "debug mld packets",
89 "debug mld trace",
90 ],
91 "bgpd": [
92 "debug bgp neighbor-events",
93 "debug bgp updates",
94 "debug bgp zebra",
95 "debug bgp nht",
96 "debug bgp neighbor-events",
97 "debug bgp graceful-restart",
98 "debug bgp update-groups",
99 "debug bgp vpn leak-from-vrf",
100 "debug bgp vpn leak-to-vrf",
101 "debug bgp zebr",
102 "debug bgp updates",
103 "debug bgp nht",
104 "debug bgp neighbor-events",
105 "debug vrf",
106 ],
107 "zebra": [
108 "debug zebra events",
109 "debug zebra rib",
110 "debug zebra vxlan",
111 "debug zebra nht",
112 ],
113 "mgmt": [],
114 "ospf": [
115 "debug ospf event",
116 "debug ospf ism",
117 "debug ospf lsa",
118 "debug ospf nsm",
119 "debug ospf nssa",
120 "debug ospf packet all",
121 "debug ospf sr",
122 "debug ospf te",
123 "debug ospf zebra",
124 ],
125 "ospf6": [
126 "debug ospf6 event",
127 "debug ospf6 ism",
128 "debug ospf6 lsa",
129 "debug ospf6 nsm",
130 "debug ospf6 nssa",
131 "debug ospf6 packet all",
132 "debug ospf6 sr",
133 "debug ospf6 te",
134 "debug ospf6 zebra",
135 ],
136 }
137
138 g_iperf_client_procs = {}
139 g_iperf_server_procs = {}
140
141
142 def is_string(value):
143 try:
144 return isinstance(value, basestring)
145 except NameError:
146 return isinstance(value, str)
147
148
149 if config.has_option("topogen", "verbosity"):
150 loglevel = config.get("topogen", "verbosity")
151 loglevel = loglevel.lower()
152 else:
153 loglevel = "info"
154
155 if config.has_option("topogen", "frrtest_log_dir"):
156 frrtest_log_dir = config.get("topogen", "frrtest_log_dir")
157 time_stamp = datetime.time(datetime.now())
158 logfile_name = "frr_test_bgp_"
159 frrtest_log_file = frrtest_log_dir + logfile_name + str(time_stamp)
160 print("frrtest_log_file..", frrtest_log_file)
161
162 logger = get_logger(
163 "test_execution_logs", log_level=loglevel, target=frrtest_log_file
164 )
165 print("Logs will be sent to logfile: {}".format(frrtest_log_file))
166
167 if config.has_option("topogen", "show_router_config"):
168 show_router_config = config.get("topogen", "show_router_config")
169 else:
170 show_router_config = False
171
172 # env variable for setting what address type to test
173 ADDRESS_TYPES = os.environ.get("ADDRESS_TYPES")
174
175
176 # Saves sequence id numbers
177 SEQ_ID = {"prefix_lists": {}, "route_maps": {}}
178
179
180 def get_seq_id(obj_type, router, obj_name):
181 """
182 Generates and saves sequence number in interval of 10
183 Parameters
184 ----------
185 * `obj_type`: prefix_lists or route_maps
186 * `router`: router name
187 *` obj_name`: name of the prefix-list or route-map
188 Returns
189 --------
190 Sequence number generated
191 """
192
193 router_data = SEQ_ID[obj_type].setdefault(router, {})
194 obj_data = router_data.setdefault(obj_name, {})
195 seq_id = obj_data.setdefault("seq_id", 0)
196
197 seq_id = int(seq_id) + 10
198 obj_data["seq_id"] = seq_id
199
200 return seq_id
201
202
203 def set_seq_id(obj_type, router, id, obj_name):
204 """
205 Saves sequence number if not auto-generated and given by user
206 Parameters
207 ----------
208 * `obj_type`: prefix_lists or route_maps
209 * `router`: router name
210 *` obj_name`: name of the prefix-list or route-map
211 """
212 router_data = SEQ_ID[obj_type].setdefault(router, {})
213 obj_data = router_data.setdefault(obj_name, {})
214 seq_id = obj_data.setdefault("seq_id", 0)
215
216 seq_id = int(seq_id) + int(id)
217 obj_data["seq_id"] = seq_id
218
219
220 class InvalidCLIError(Exception):
221 """Raise when the CLI command is wrong"""
222
223
224 def run_frr_cmd(rnode, cmd, isjson=False):
225 """
226 Execute frr show commands in privileged mode
227 * `rnode`: router node on which command needs to be executed
228 * `cmd`: Command to be executed on frr
229 * `isjson`: If command is to get json data or not
230 :return str:
231 """
232
233 if cmd:
234 ret_data = rnode.vtysh_cmd(cmd, isjson=isjson)
235
236 if isjson:
237 rnode.vtysh_cmd(cmd.rstrip("json"), isjson=False)
238
239 return ret_data
240
241 else:
242 raise InvalidCLIError("No actual cmd passed")
243
244
245 def apply_raw_config(tgen, input_dict):
246 """
247 API to configure raw configuration on device. This can be used for any cli
248 which has not been implemented in JSON.
249
250 Parameters
251 ----------
252 * `tgen`: tgen object
253 * `input_dict`: configuration that needs to be applied
254
255 Usage
256 -----
257 input_dict = {
258 "r2": {
259 "raw_config": [
260 "router bgp",
261 "no bgp update-group-split-horizon"
262 ]
263 }
264 }
265 Returns
266 -------
267 True or errormsg
268 """
269
270 rlist = []
271
272 for router_name in input_dict.keys():
273 config_cmd = input_dict[router_name]["raw_config"]
274
275 if not isinstance(config_cmd, list):
276 config_cmd = [config_cmd]
277
278 frr_cfg_file = "{}/{}/{}".format(tgen.logdir, router_name, FRRCFG_FILE)
279 with open(frr_cfg_file, "w") as cfg:
280 for cmd in config_cmd:
281 cfg.write("{}\n".format(cmd))
282
283 rlist.append(router_name)
284
285 # Load config on all routers
286 return load_config_to_routers(tgen, rlist)
287
288
289 def create_common_configurations(
290 tgen, config_dict, config_type=None, build=False, load_config=True
291 ):
292 """
293 API to create object of class FRRConfig and also create frr_json.conf
294 file. It will create interface and common configurations and save it to
295 frr_json.conf and load to router
296 Parameters
297 ----------
298 * `tgen`: tgen object
299 * `config_dict`: Configuration data saved in a dict of { router: config-list }
300 * `routers` : list of router id to be configured.
301 * `config_type` : Syntactic information while writing configuration. Should
302 be one of the value as mentioned in the config_map below.
303 * `build` : Only for initial setup phase this is set as True
304 Returns
305 -------
306 True or False
307 """
308
309 config_map = OrderedDict(
310 {
311 "general_config": "! FRR General Config\n",
312 "debug_log_config": "! Debug log Config\n",
313 "interface_config": "! Interfaces Config\n",
314 "static_route": "! Static Route Config\n",
315 "prefix_list": "! Prefix List Config\n",
316 "bgp_community_list": "! Community List Config\n",
317 "route_maps": "! Route Maps Config\n",
318 "bgp": "! BGP Config\n",
319 "vrf": "! VRF Config\n",
320 "ospf": "! OSPF Config\n",
321 "ospf6": "! OSPF Config\n",
322 "pim": "! PIM Config\n",
323 }
324 )
325
326 if build:
327 mode = "a"
328 elif not load_config:
329 mode = "a"
330 else:
331 mode = "w"
332
333 routers = config_dict.keys()
334 for router in routers:
335 fname = "{}/{}/{}".format(tgen.logdir, router, FRRCFG_FILE)
336 try:
337 frr_cfg_fd = open(fname, mode)
338 if config_type:
339 frr_cfg_fd.write(config_map[config_type])
340 for line in config_dict[router]:
341 frr_cfg_fd.write("{} \n".format(str(line)))
342 frr_cfg_fd.write("\n")
343
344 except IOError as err:
345 logger.error("Unable to open FRR Config '%s': %s" % (fname, str(err)))
346 return False
347 finally:
348 frr_cfg_fd.close()
349
350 # If configuration applied from build, it will done at last
351 result = True
352 if not build and load_config:
353 result = load_config_to_routers(tgen, routers)
354
355 return result
356
357
358 def create_common_configuration(
359 tgen, router, data, config_type=None, build=False, load_config=True
360 ):
361 """
362 API to create object of class FRRConfig and also create frr_json.conf
363 file. It will create interface and common configurations and save it to
364 frr_json.conf and load to router
365 Parameters
366 ----------
367 * `tgen`: tgen object
368 * `data`: Configuration data saved in a list.
369 * `router` : router id to be configured.
370 * `config_type` : Syntactic information while writing configuration. Should
371 be one of the value as mentioned in the config_map below.
372 * `build` : Only for initial setup phase this is set as True
373 Returns
374 -------
375 True or False
376 """
377 return create_common_configurations(
378 tgen, {router: data}, config_type, build, load_config
379 )
380
381
382 def kill_router_daemons(tgen, router, daemons, save_config=True):
383 """
384 Router's current config would be saved to /etc/frr/ for each daemon
385 and daemon would be killed forcefully using SIGKILL.
386 * `tgen` : topogen object
387 * `router`: Device under test
388 * `daemons`: list of daemons to be killed
389 """
390
391 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
392
393 try:
394 router_list = tgen.routers()
395
396 if save_config:
397 # Saving router config to /etc/frr, which will be loaded to router
398 # when it starts
399 router_list[router].vtysh_cmd("write memory")
400
401 # Kill Daemons
402 result = router_list[router].killDaemons(daemons)
403 if len(result) > 0:
404 assert "Errors found post shutdown - details follow:" == 0, result
405 return result
406
407 except Exception as e:
408 errormsg = traceback.format_exc()
409 logger.error(errormsg)
410 return errormsg
411
412
413 def start_router_daemons(tgen, router, daemons):
414 """
415 Daemons defined by user would be started
416 * `tgen` : topogen object
417 * `router`: Device under test
418 * `daemons`: list of daemons to be killed
419 """
420
421 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
422
423 try:
424 router_list = tgen.routers()
425
426 # Start daemons
427 res = router_list[router].startDaemons(daemons)
428
429 except Exception as e:
430 errormsg = traceback.format_exc()
431 logger.error(errormsg)
432 res = errormsg
433
434 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
435 return res
436
437
438 def check_router_status(tgen):
439 """
440 Check if all daemons are running for all routers in topology
441 * `tgen` : topogen object
442 """
443
444 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
445
446 try:
447 router_list = tgen.routers()
448 for router, rnode in router_list.items():
449 result = rnode.check_router_running()
450 if result != "":
451 daemons = []
452 if "mgmtd" in result:
453 daemons.append("mgmtd")
454 if "bgpd" in result:
455 daemons.append("bgpd")
456 if "zebra" in result:
457 daemons.append("zebra")
458 if "pimd" in result:
459 daemons.append("pimd")
460 if "pim6d" in result:
461 daemons.append("pim6d")
462 if "ospfd" in result:
463 daemons.append("ospfd")
464 if "ospf6d" in result:
465 daemons.append("ospf6d")
466 rnode.startDaemons(daemons)
467
468 except Exception as e:
469 errormsg = traceback.format_exc()
470 logger.error(errormsg)
471 return errormsg
472
473 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
474 return True
475
476
477 def save_initial_config_on_routers(tgen):
478 """Save current configuration on routers to FRRCFG_BKUP_FILE.
479
480 FRRCFG_BKUP_FILE is the file that will be restored when `reset_config_on_routers()`
481 is called.
482
483 Parameters
484 ----------
485 * `tgen` : Topogen object
486 """
487 router_list = tgen.routers()
488 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
489
490 # Get all running configs in parallel
491 procs = {}
492 for rname in router_list:
493 logger.debug("Fetching running config for router %s", rname)
494 procs[rname] = router_list[rname].popen(
495 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
496 stdin=None,
497 stdout=open(target_cfg_fmt.format(rname), "w"),
498 stderr=subprocess.PIPE,
499 )
500 for rname, p in procs.items():
501 _, error = p.communicate()
502 if p.returncode:
503 logger.error(
504 "Get running config for %s failed %d: %s", rname, p.returncode, error
505 )
506 raise InvalidCLIError(
507 "vtysh show running error on {}: {}".format(rname, error)
508 )
509
510
511 def reset_config_on_routers(tgen, routerName=None):
512 """
513 Resets configuration on routers to the snapshot created using input JSON
514 file. It replaces existing router configuration with FRRCFG_BKUP_FILE
515
516 Parameters
517 ----------
518 * `tgen` : Topogen object
519 * `routerName` : router config is to be reset
520 """
521
522 logger.debug("Entering API: reset_config_on_routers")
523
524 tgen.cfg_gen += 1
525 gen = tgen.cfg_gen
526
527 # Trim the router list if needed
528 router_list = tgen.routers()
529 if routerName:
530 if routerName not in router_list:
531 logger.warning(
532 "Exiting API: reset_config_on_routers: no router %s",
533 routerName,
534 exc_info=True,
535 )
536 return True
537 router_list = {routerName: router_list[routerName]}
538
539 delta_fmt = tgen.logdir + "/{}/delta-{}.conf"
540 # FRRCFG_BKUP_FILE
541 target_cfg_fmt = tgen.logdir + "/{}/frr_json_initial.conf"
542 run_cfg_fmt = tgen.logdir + "/{}/frr-{}.sav"
543
544 #
545 # Get all running configs in parallel
546 #
547 procs = {}
548 for rname in router_list:
549 logger.debug("Fetching running config for router %s", rname)
550 procs[rname] = router_list[rname].popen(
551 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
552 stdin=None,
553 stdout=open(run_cfg_fmt.format(rname, gen), "w"),
554 stderr=subprocess.PIPE,
555 )
556 for rname, p in procs.items():
557 _, error = p.communicate()
558 if p.returncode:
559 logger.error(
560 "Get running config for %s failed %d: %s", rname, p.returncode, error
561 )
562 raise InvalidCLIError(
563 "vtysh show running error on {}: {}".format(rname, error)
564 )
565
566 #
567 # Get all delta's in parallel
568 #
569 procs = {}
570 for rname in router_list:
571 logger.debug(
572 "Generating delta for router %s to new configuration (gen %d)", rname, gen
573 )
574 procs[rname] = tgen.net.popen(
575 [
576 "/usr/lib/frr/frr-reload.py",
577 "--test-reset",
578 "--input",
579 run_cfg_fmt.format(rname, gen),
580 "--test",
581 target_cfg_fmt.format(rname),
582 ],
583 stdin=None,
584 stdout=open(delta_fmt.format(rname, gen), "w"),
585 stderr=subprocess.PIPE,
586 )
587 for rname, p in procs.items():
588 _, error = p.communicate()
589 if p.returncode:
590 logger.error(
591 "Delta file creation for %s failed %d: %s", rname, p.returncode, error
592 )
593 raise InvalidCLIError("frr-reload error for {}: {}".format(rname, error))
594
595 #
596 # Apply all the deltas in parallel
597 #
598 procs = {}
599 for rname in router_list:
600 logger.debug("Applying delta config on router %s", rname)
601
602 procs[rname] = router_list[rname].popen(
603 ["/usr/bin/env", "vtysh", "-f", delta_fmt.format(rname, gen)],
604 stdin=None,
605 stdout=subprocess.PIPE,
606 stderr=subprocess.STDOUT,
607 )
608 for rname, p in procs.items():
609 output, _ = p.communicate()
610 vtysh_command = "vtysh -f {}".format(delta_fmt.format(rname, gen))
611 if not p.returncode:
612 router_list[rname].logger.debug(
613 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
614 vtysh_command, output
615 )
616 )
617 else:
618 router_list[rname].logger.warning(
619 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
620 vtysh_command, output
621 )
622 )
623 logger.error(
624 "Delta file apply for %s failed %d: %s", rname, p.returncode, output
625 )
626
627 # We really need to enable this failure; however, currently frr-reload.py
628 # producing invalid "no" commands as it just preprends "no", but some of the
629 # command forms lack matching values (e.g., final values). Until frr-reload
630 # is fixed to handle this (or all the CLI no forms are adjusted) we can't
631 # fail tests.
632 # raise InvalidCLIError("frr-reload error for {}: {}".format(rname, output))
633
634 #
635 # Optionally log all new running config if "show_router_config" is defined in
636 # "pytest.ini"
637 #
638 if show_router_config:
639 procs = {}
640 for rname in router_list:
641 logger.debug("Fetching running config for router %s", rname)
642 procs[rname] = router_list[rname].popen(
643 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
644 stdin=None,
645 stdout=subprocess.PIPE,
646 stderr=subprocess.STDOUT,
647 )
648 for rname, p in procs.items():
649 output, _ = p.communicate()
650 if p.returncode:
651 logger.warning(
652 "Get running config for %s failed %d: %s",
653 rname,
654 p.returncode,
655 output,
656 )
657 else:
658 logger.debug(
659 "Configuration on router %s after reset:\n%s", rname, output
660 )
661
662 logger.debug("Exiting API: reset_config_on_routers")
663 return True
664
665
666 def prep_load_config_to_routers(tgen, *config_name_list):
667 """Create common config for `load_config_to_routers`.
668
669 The common config file is constructed from the list of sub-config files passed as
670 position arguments to this function. Each entry in `config_name_list` is looked for
671 under the router sub-directory in the test directory and those files are
672 concatenated together to create the common config. e.g.,
673
674 # Routers are "r1" and "r2", test file is `example/test_example_foo.py`
675 prepare_load_config_to_routers(tgen, "bgpd.conf", "ospfd.conf")
676
677 When the above call is made the files in
678
679 example/r1/bgpd.conf
680 example/r1/ospfd.conf
681
682 Are concat'd together into a single config file that will be loaded on r1, and
683
684 example/r2/bgpd.conf
685 example/r2/ospfd.conf
686
687 Are concat'd together into a single config file that will be loaded on r2 when
688 the call to `load_config_to_routers` is made.
689 """
690
691 routers = tgen.routers()
692 for rname, router in routers.items():
693 destname = "{}/{}/{}".format(tgen.logdir, rname, FRRCFG_FILE)
694 wmode = "w"
695 for cfbase in config_name_list:
696 script_dir = os.environ["PYTEST_TOPOTEST_SCRIPTDIR"]
697 confname = os.path.join(script_dir, "{}/{}".format(rname, cfbase))
698 with open(confname, "r") as cf:
699 with open(destname, wmode) as df:
700 df.write(cf.read())
701 wmode = "a"
702
703
704 def load_config_to_routers(tgen, routers, save_bkup=False):
705 """
706 Loads configuration on routers from the file FRRCFG_FILE.
707
708 Parameters
709 ----------
710 * `tgen` : Topogen object
711 * `routers` : routers for which configuration is to be loaded
712 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
713 Returns
714 -------
715 True or False
716 """
717
718 logger.debug("Entering API: load_config_to_routers")
719
720 tgen.cfg_gen += 1
721 gen = tgen.cfg_gen
722
723 base_router_list = tgen.routers()
724 router_list = {}
725 for router in routers:
726 if router not in base_router_list:
727 continue
728 router_list[router] = base_router_list[router]
729
730 frr_cfg_file_fmt = tgen.logdir + "/{}/" + FRRCFG_FILE
731 frr_cfg_save_file_fmt = tgen.logdir + "/{}/{}-" + FRRCFG_FILE
732 frr_cfg_bkup_fmt = tgen.logdir + "/{}/" + FRRCFG_BKUP_FILE
733
734 procs = {}
735 for rname in router_list:
736 router = router_list[rname]
737 try:
738 frr_cfg_file = frr_cfg_file_fmt.format(rname)
739 frr_cfg_save_file = frr_cfg_save_file_fmt.format(rname, gen)
740 frr_cfg_bkup = frr_cfg_bkup_fmt.format(rname)
741 with open(frr_cfg_file, "r+") as cfg:
742 data = cfg.read()
743 logger.debug(
744 "Applying following configuration on router %s (gen: %d):\n%s",
745 rname,
746 gen,
747 data,
748 )
749 # Always save a copy of what we just did
750 with open(frr_cfg_save_file, "w") as bkup:
751 bkup.write(data)
752 if save_bkup:
753 with open(frr_cfg_bkup, "w") as bkup:
754 bkup.write(data)
755 procs[rname] = router_list[rname].popen(
756 ["/usr/bin/env", "vtysh", "-f", frr_cfg_file],
757 stdin=None,
758 stdout=subprocess.PIPE,
759 stderr=subprocess.STDOUT,
760 )
761 except IOError as err:
762 logger.error(
763 "Unable to open config File. error(%s): %s", err.errno, err.strerror
764 )
765 return False
766 except Exception as error:
767 logger.error("Unable to apply config on %s: %s", rname, str(error))
768 return False
769
770 errors = []
771 for rname, p in procs.items():
772 output, _ = p.communicate()
773 frr_cfg_file = frr_cfg_file_fmt.format(rname)
774 vtysh_command = "vtysh -f " + frr_cfg_file
775 if not p.returncode:
776 router_list[rname].logger.debug(
777 '\nvtysh config apply => "{}"\nvtysh output <= "{}"'.format(
778 vtysh_command, output
779 )
780 )
781 else:
782 router_list[rname].logger.error(
783 '\nvtysh config apply failed => "{}"\nvtysh output <= "{}"'.format(
784 vtysh_command, output
785 )
786 )
787 logger.error(
788 "Config apply for %s failed %d: %s", rname, p.returncode, output
789 )
790 # We can't thorw an exception here as we won't clear the config file.
791 errors.append(
792 InvalidCLIError(
793 "load_config_to_routers error for {}: {}".format(rname, output)
794 )
795 )
796
797 # Empty the config file or we append to it next time through.
798 with open(frr_cfg_file, "r+") as cfg:
799 cfg.truncate(0)
800
801 # Router current configuration to log file or console if
802 # "show_router_config" is defined in "pytest.ini"
803 if show_router_config:
804 procs = {}
805 for rname in router_list:
806 procs[rname] = router_list[rname].popen(
807 ["/usr/bin/env", "vtysh", "-c", "show running-config no-header"],
808 stdin=None,
809 stdout=subprocess.PIPE,
810 stderr=subprocess.STDOUT,
811 )
812 for rname, p in procs.items():
813 output, _ = p.communicate()
814 if p.returncode:
815 logger.warning(
816 "Get running config for %s failed %d: %s",
817 rname,
818 p.returncode,
819 output,
820 )
821 else:
822 logger.debug("New configuration for router %s:\n%s", rname, output)
823
824 logger.debug("Exiting API: load_config_to_routers")
825 return not errors
826
827
828 def load_config_to_router(tgen, routerName, save_bkup=False):
829 """
830 Loads configuration on router from the file FRRCFG_FILE.
831
832 Parameters
833 ----------
834 * `tgen` : Topogen object
835 * `routerName` : router for which configuration to be loaded
836 * `save_bkup` : If True, Saves snapshot of FRRCFG_FILE to FRRCFG_BKUP_FILE
837 """
838 return load_config_to_routers(tgen, [routerName], save_bkup)
839
840
841 def reset_with_new_configs(tgen, *cflist):
842 """Reset the router to initial config, then load new configs.
843
844 Resets routers to the initial config state (see `save_initial_config_on_routers()
845 and `reset_config_on_routers()` `), then concat list of router sub-configs together
846 and load onto the routers (see `prep_load_config_to_routers()` and
847 `load_config_to_routers()`)
848 """
849 routers = tgen.routers()
850
851 reset_config_on_routers(tgen)
852 prep_load_config_to_routers(tgen, *cflist)
853 load_config_to_routers(tgen, tgen.routers(), save_bkup=False)
854
855
856 def get_frr_ipv6_linklocal(tgen, router, intf=None, vrf=None):
857 """
858 API to get the link local ipv6 address of a particular interface using
859 FRR command 'show interface'
860
861 * `tgen`: tgen object
862 * `router` : router for which highest interface should be
863 calculated
864 * `intf` : interface for which link-local address needs to be taken
865 * `vrf` : VRF name
866
867 Usage
868 -----
869 linklocal = get_frr_ipv6_linklocal(tgen, router, "intf1", RED_A)
870
871 Returns
872 -------
873 1) array of interface names to link local ips.
874 """
875
876 router_list = tgen.routers()
877 for rname, rnode in router_list.items():
878 if rname != router:
879 continue
880
881 linklocal = []
882
883 if vrf:
884 cmd = "show interface vrf {}".format(vrf)
885 else:
886 cmd = "show interface"
887
888 linklocal = []
889 if vrf:
890 cmd = "show interface vrf {}".format(vrf)
891 else:
892 cmd = "show interface"
893 for chk_ll in range(0, 60):
894 sleep(1 / 4)
895 ifaces = router_list[router].run('vtysh -c "{}"'.format(cmd))
896 # Fix newlines (make them all the same)
897 ifaces = ("\n".join(ifaces.splitlines()) + "\n").splitlines()
898
899 interface = None
900 ll_per_if_count = 0
901 for line in ifaces:
902 # Interface name
903 m = re_search("Interface ([a-zA-Z0-9-]+) is", line)
904 if m:
905 interface = m.group(1).split(" ")[0]
906 ll_per_if_count = 0
907
908 # Interface ip
909 m1 = re_search("inet6 (fe80[:a-fA-F0-9]+/[0-9]+)", line)
910 if m1:
911 local = m1.group(1)
912 ll_per_if_count += 1
913 if ll_per_if_count > 1:
914 linklocal += [["%s-%s" % (interface, ll_per_if_count), local]]
915 else:
916 linklocal += [[interface, local]]
917
918 try:
919 if linklocal:
920 if intf:
921 return [
922 _linklocal[1]
923 for _linklocal in linklocal
924 if _linklocal[0] == intf
925 ][0].split("/")[0]
926 return linklocal
927 except IndexError:
928 continue
929
930 errormsg = "Link local ip missing on router {}".format(router)
931 return errormsg
932
933
934 def generate_support_bundle():
935 """
936 API to generate support bundle on any verification ste failure.
937 it runs a python utility, /usr/lib/frr/generate_support_bundle.py,
938 which basically runs defined CLIs and dumps the data to specified location
939 """
940
941 tgen = get_topogen()
942 router_list = tgen.routers()
943 test_name = os.environ.get("PYTEST_CURRENT_TEST").split(":")[-1].split(" ")[0]
944
945 bundle_procs = {}
946 for rname, rnode in router_list.items():
947 logger.info("Spawn collection of support bundle for %s", rname)
948 dst_bundle = "{}/{}/support_bundles/{}".format(tgen.logdir, rname, test_name)
949 rnode.run("mkdir -p " + dst_bundle)
950
951 gen_sup_cmd = [
952 "/usr/lib/frr/generate_support_bundle.py",
953 "--log-dir=" + dst_bundle,
954 ]
955 bundle_procs[rname] = tgen.net[rname].popen(gen_sup_cmd, stdin=None)
956
957 for rname, rnode in router_list.items():
958 logger.debug("Waiting on support bundle for %s", rname)
959 output, error = bundle_procs[rname].communicate()
960 if output:
961 logger.debug(
962 "Output from collecting support bundle for %s:\n%s", rname, output
963 )
964 if error:
965 logger.warning(
966 "Error from collecting support bundle for %s:\n%s", rname, error
967 )
968
969 return True
970
971
972 def start_topology(tgen):
973 """
974 Starting topology, create tmp files which are loaded to routers
975 to start daemons and then start routers
976 * `tgen` : topogen object
977 """
978
979 # Starting topology
980 tgen.start_topology()
981
982 # Starting daemons
983
984 router_list = tgen.routers()
985 routers_sorted = sorted(
986 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
987 )
988
989 linux_ver = ""
990 router_list = tgen.routers()
991 for rname in routers_sorted:
992 router = router_list[rname]
993
994 # It will help in debugging the failures, will give more details on which
995 # specific kernel version tests are failing
996 if linux_ver == "":
997 linux_ver = router.run("uname -a")
998 logger.info("Logging platform related details: \n %s \n", linux_ver)
999
1000 try:
1001 os.chdir(tgen.logdir)
1002
1003 # # Creating router named dir and empty zebra.conf bgpd.conf files
1004 # # inside the current directory
1005 # if os.path.isdir("{}".format(rname)):
1006 # os.system("rm -rf {}".format(rname))
1007 # os.mkdir("{}".format(rname))
1008 # os.system("chmod -R go+rw {}".format(rname))
1009 # os.chdir("{}/{}".format(tgen.logdir, rname))
1010 # os.system("touch zebra.conf bgpd.conf")
1011 # else:
1012 # os.mkdir("{}".format(rname))
1013 # os.system("chmod -R go+rw {}".format(rname))
1014 # os.chdir("{}/{}".format(tgen.logdir, rname))
1015 # os.system("touch zebra.conf bgpd.conf")
1016
1017 except IOError as err:
1018 logger.error("I/O error({0}): {1}".format(err.errno, err.strerror))
1019
1020 topo = tgen.json_topo
1021 feature = set()
1022
1023 if "feature" in topo:
1024 feature.update(topo["feature"])
1025
1026 if rname in topo["routers"]:
1027 for key in topo["routers"][rname].keys():
1028 feature.add(key)
1029
1030 for val in topo["routers"][rname]["links"].values():
1031 if "pim" in val:
1032 feature.add("pim")
1033 break
1034 for val in topo["routers"][rname]["links"].values():
1035 if "pim6" in val:
1036 feature.add("pim6")
1037 break
1038 for val in topo["routers"][rname]["links"].values():
1039 if "ospf6" in val:
1040 feature.add("ospf6")
1041 break
1042 if "switches" in topo and rname in topo["switches"]:
1043 for val in topo["switches"][rname]["links"].values():
1044 if "ospf" in val:
1045 feature.add("ospf")
1046 break
1047 if "ospf6" in val:
1048 feature.add("ospf6")
1049 break
1050
1051 # Loading empty mgmtd.conf file to router, to start the mgmtd daemon
1052 router.load_config(
1053 TopoRouter.RD_MGMTD, "{}/{}/mgmtd.conf".format(tgen.logdir, rname)
1054 )
1055
1056 # Loading empty zebra.conf file to router, to start the zebra deamon
1057 router.load_config(
1058 TopoRouter.RD_ZEBRA, "{}/{}/zebra.conf".format(tgen.logdir, rname)
1059 )
1060
1061 # Loading empty bgpd.conf file to router, to start the bgp deamon
1062 if "bgp" in feature:
1063 router.load_config(
1064 TopoRouter.RD_BGP, "{}/{}/bgpd.conf".format(tgen.logdir, rname)
1065 )
1066
1067 # Loading empty pimd.conf file to router, to start the pim deamon
1068 if "pim" in feature:
1069 router.load_config(
1070 TopoRouter.RD_PIM, "{}/{}/pimd.conf".format(tgen.logdir, rname)
1071 )
1072
1073 # Loading empty pimd.conf file to router, to start the pim deamon
1074 if "pim6" in feature:
1075 router.load_config(
1076 TopoRouter.RD_PIM6, "{}/{}/pim6d.conf".format(tgen.logdir, rname)
1077 )
1078
1079 if "ospf" in feature:
1080 # Loading empty ospf.conf file to router, to start the ospf deamon
1081 router.load_config(
1082 TopoRouter.RD_OSPF, "{}/{}/ospfd.conf".format(tgen.logdir, rname)
1083 )
1084
1085 if "ospf6" in feature:
1086 # Loading empty ospf.conf file to router, to start the ospf deamon
1087 router.load_config(
1088 TopoRouter.RD_OSPF6, "{}/{}/ospf6d.conf".format(tgen.logdir, rname)
1089 )
1090
1091 # Starting routers
1092 logger.info("Starting all routers once topology is created")
1093 tgen.start_router()
1094
1095
1096 def stop_router(tgen, router):
1097 """
1098 Router"s current config would be saved to /tmp/topotest/<suite>/<router> for each daemon
1099 and router and its daemons would be stopped.
1100
1101 * `tgen` : topogen object
1102 * `router`: Device under test
1103 """
1104
1105 router_list = tgen.routers()
1106
1107 # Saving router config to /etc/frr, which will be loaded to router
1108 # when it starts
1109 router_list[router].vtysh_cmd("write memory")
1110
1111 # Stop router
1112 router_list[router].stop()
1113
1114
1115 def start_router(tgen, router):
1116 """
1117 Router will be started and config would be loaded from /tmp/topotest/<suite>/<router> for each
1118 daemon
1119
1120 * `tgen` : topogen object
1121 * `router`: Device under test
1122 """
1123
1124 logger.debug("Entering lib API: start_router")
1125
1126 try:
1127 router_list = tgen.routers()
1128
1129 # Router and its daemons would be started and config would
1130 # be loaded to router for each daemon from /etc/frr
1131 router_list[router].start()
1132
1133 # Waiting for router to come up
1134 sleep(5)
1135
1136 except Exception as e:
1137 errormsg = traceback.format_exc()
1138 logger.error(errormsg)
1139 return errormsg
1140
1141 logger.debug("Exiting lib API: start_router()")
1142 return True
1143
1144
1145 def number_to_row(routerName):
1146 """
1147 Returns the number for the router.
1148 Calculation based on name a0 = row 0, a1 = row 1, b2 = row 2, z23 = row 23
1149 etc
1150 """
1151 return int(routerName[1:])
1152
1153
1154 def number_to_column(routerName):
1155 """
1156 Returns the number for the router.
1157 Calculation based on name a0 = columnn 0, a1 = column 0, b2= column 1,
1158 z23 = column 26 etc
1159 """
1160 return ord(routerName[0]) - 97
1161
1162
1163 def topo_daemons(tgen, topo=None):
1164 """
1165 Returns daemon list required for the suite based on topojson.
1166 """
1167 daemon_list = []
1168
1169 if topo is None:
1170 topo = tgen.json_topo
1171
1172 router_list = tgen.routers()
1173 routers_sorted = sorted(
1174 router_list.keys(), key=lambda x: int(re_search("[0-9]+", x).group(0))
1175 )
1176
1177 for rtr in routers_sorted:
1178 if "ospf" in topo["routers"][rtr] and "ospfd" not in daemon_list:
1179 daemon_list.append("ospfd")
1180
1181 if "ospf6" in topo["routers"][rtr] and "ospf6d" not in daemon_list:
1182 daemon_list.append("ospf6d")
1183
1184 for val in topo["routers"][rtr]["links"].values():
1185 if "pim" in val and "pimd" not in daemon_list:
1186 daemon_list.append("pimd")
1187 if "pim6" in val and "pim6d" not in daemon_list:
1188 daemon_list.append("pim6d")
1189 if "ospf" in val and "ospfd" not in daemon_list:
1190 daemon_list.append("ospfd")
1191 if "ospf6" in val and "ospf6d" not in daemon_list:
1192 daemon_list.append("ospf6d")
1193 break
1194
1195 return daemon_list
1196
1197
1198 def add_interfaces_to_vlan(tgen, input_dict):
1199 """
1200 Add interfaces to VLAN, we need vlan pakcage to be installed on machine
1201
1202 * `tgen`: tgen onject
1203 * `input_dict` : interfaces to be added to vlans
1204
1205 input_dict= {
1206 "r1":{
1207 "vlan":{
1208 VLAN_1: [{
1209 intf_r1_s1: {
1210 "ip": "10.1.1.1",
1211 "subnet": "255.255.255.0
1212 }
1213 }]
1214 }
1215 }
1216 }
1217
1218 add_interfaces_to_vlan(tgen, input_dict)
1219
1220 """
1221
1222 router_list = tgen.routers()
1223 for dut in input_dict.keys():
1224 rnode = router_list[dut]
1225
1226 if "vlan" in input_dict[dut]:
1227 for vlan, interfaces in input_dict[dut]["vlan"].items():
1228 for intf_dict in interfaces:
1229 for interface, data in intf_dict.items():
1230 # Adding interface to VLAN
1231 vlan_intf = "{}.{}".format(interface, vlan)
1232 cmd = "ip link add link {} name {} type vlan id {}".format(
1233 interface, vlan_intf, vlan
1234 )
1235 logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
1236 result = rnode.run(cmd)
1237 logger.debug("result %s", result)
1238
1239 # Bringing interface up
1240 cmd = "ip link set {} up".format(vlan_intf)
1241 logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
1242 result = rnode.run(cmd)
1243 logger.debug("result %s", result)
1244
1245 # Assigning IP address
1246 ifaddr = ipaddress.ip_interface(
1247 "{}/{}".format(
1248 frr_unicode(data["ip"]), frr_unicode(data["subnet"])
1249 )
1250 )
1251
1252 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1253 ifaddr.version, vlan_intf, ifaddr
1254 )
1255 logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
1256 result = rnode.run(cmd)
1257 logger.debug("result %s", result)
1258
1259
1260 def tcpdump_capture_start(
1261 tgen,
1262 router,
1263 intf,
1264 protocol=None,
1265 grepstr=None,
1266 timeout=0,
1267 options=None,
1268 cap_file=None,
1269 background=True,
1270 ):
1271 """
1272 API to capture network packets using tcp dump.
1273
1274 Packages used :
1275
1276 Parameters
1277 ----------
1278 * `tgen`: topogen object.
1279 * `router`: router on which ping has to be performed.
1280 * `intf` : interface for capture.
1281 * `protocol` : protocol for which packet needs to be captured.
1282 * `grepstr` : string to filter out tcp dump output.
1283 * `timeout` : Time for which packet needs to be captured.
1284 * `options` : options for TCP dump, all tcpdump options can be used.
1285 * `cap_file` : filename to store capture dump.
1286 * `background` : Make tcp dump run in back ground.
1287
1288 Usage
1289 -----
1290 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1291 options='-A -vv -x > r2bgp.txt ')
1292 Returns
1293 -------
1294 1) True for successful capture
1295 2) errormsg - when tcp dump fails
1296 """
1297
1298 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1299
1300 rnode = tgen.gears[router]
1301
1302 if timeout > 0:
1303 cmd = "timeout {}".format(timeout)
1304 else:
1305 cmd = ""
1306
1307 cmdargs = "{} tcpdump".format(cmd)
1308
1309 if intf:
1310 cmdargs += " -i {}".format(str(intf))
1311 if protocol:
1312 cmdargs += " {}".format(str(protocol))
1313 if options:
1314 cmdargs += " -s 0 {}".format(str(options))
1315
1316 if cap_file:
1317 file_name = os.path.join(tgen.logdir, router, cap_file)
1318 cmdargs += " -w {}".format(str(file_name))
1319 # Remove existing capture file
1320 rnode.run("rm -rf {}".format(file_name))
1321
1322 if grepstr:
1323 cmdargs += ' | grep "{}"'.format(str(grepstr))
1324
1325 logger.info("Running tcpdump command: [%s]", cmdargs)
1326 if not background:
1327 rnode.run(cmdargs)
1328 else:
1329 # XXX this & is bogus doesn't work
1330 # rnode.run("nohup {} & /dev/null 2>&1".format(cmdargs))
1331 rnode.run("nohup {} > /dev/null 2>&1".format(cmdargs))
1332
1333 # Check if tcpdump process is running
1334 if background:
1335 result = rnode.run("pgrep tcpdump")
1336 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1337
1338 if not result:
1339 errormsg = "tcpdump is not running {}".format("tcpdump")
1340 return errormsg
1341 else:
1342 logger.info("Packet capture started on %s: interface %s", router, intf)
1343
1344 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1345 return True
1346
1347
1348 def tcpdump_capture_stop(tgen, router):
1349 """
1350 API to capture network packets using tcp dump.
1351
1352 Packages used :
1353
1354 Parameters
1355 ----------
1356 * `tgen`: topogen object.
1357 * `router`: router on which ping has to be performed.
1358 * `intf` : interface for capture.
1359 * `protocol` : protocol for which packet needs to be captured.
1360 * `grepstr` : string to filter out tcp dump output.
1361 * `timeout` : Time for which packet needs to be captured.
1362 * `options` : options for TCP dump, all tcpdump options can be used.
1363 * `cap2file` : filename to store capture dump.
1364 * `bakgrnd` : Make tcp dump run in back ground.
1365
1366 Usage
1367 -----
1368 tcpdump_result = tcpdump_dut(tgen, 'r2', intf, protocol='tcp', timeout=20,
1369 options='-A -vv -x > r2bgp.txt ')
1370 Returns
1371 -------
1372 1) True for successful capture
1373 2) errormsg - when tcp dump fails
1374 """
1375
1376 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1377
1378 rnode = tgen.gears[router]
1379
1380 # Check if tcpdump process is running
1381 result = rnode.run("ps -ef | grep tcpdump")
1382 logger.debug("ps -ef | grep tcpdump \n {}".format(result))
1383
1384 if not re_search(r"{}".format("tcpdump"), result):
1385 errormsg = "tcpdump is not running {}".format("tcpdump")
1386 return errormsg
1387 else:
1388 # XXX this doesn't work with micronet
1389 ppid = tgen.net.nameToNode[rnode.name].pid
1390 rnode.run("set +m; pkill -P %s tcpdump &> /dev/null" % ppid)
1391 logger.info("Stopped tcpdump capture")
1392
1393 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1394 return True
1395
1396
1397 def create_debug_log_config(tgen, input_dict, build=False):
1398 """
1399 Enable/disable debug logs for any protocol with defined debug
1400 options and logs would be saved to created log file
1401
1402 Parameters
1403 ----------
1404 * `tgen` : Topogen object
1405 * `input_dict` : details to enable debug logs for protocols
1406 * `build` : Only for initial setup phase this is set as True.
1407
1408
1409 Usage:
1410 ------
1411 input_dict = {
1412 "r2": {
1413 "debug":{
1414 "log_file" : "debug.log",
1415 "enable": ["pimd", "zebra"],
1416 "disable": {
1417 "bgpd":[
1418 'debug bgp neighbor-events',
1419 'debug bgp updates',
1420 'debug bgp zebra',
1421 ]
1422 }
1423 }
1424 }
1425 }
1426
1427 result = create_debug_log_config(tgen, input_dict)
1428
1429 Returns
1430 -------
1431 True or False
1432 """
1433
1434 result = False
1435 try:
1436 debug_config_dict = {}
1437
1438 for router in input_dict.keys():
1439 debug_config = []
1440 if "debug" in input_dict[router]:
1441 debug_dict = input_dict[router]["debug"]
1442
1443 disable_logs = debug_dict.setdefault("disable", None)
1444 enable_logs = debug_dict.setdefault("enable", None)
1445 log_file = debug_dict.setdefault("log_file", None)
1446
1447 if log_file:
1448 _log_file = os.path.join(tgen.logdir, log_file)
1449 debug_config.append("log file {} \n".format(_log_file))
1450
1451 if type(enable_logs) is list:
1452 for daemon in enable_logs:
1453 for debug_log in DEBUG_LOGS[daemon]:
1454 debug_config.append("{}".format(debug_log))
1455 elif type(enable_logs) is dict:
1456 for daemon, debug_logs in enable_logs.items():
1457 for debug_log in debug_logs:
1458 debug_config.append("{}".format(debug_log))
1459
1460 if type(disable_logs) is list:
1461 for daemon in disable_logs:
1462 for debug_log in DEBUG_LOGS[daemon]:
1463 debug_config.append("no {}".format(debug_log))
1464 elif type(disable_logs) is dict:
1465 for daemon, debug_logs in disable_logs.items():
1466 for debug_log in debug_logs:
1467 debug_config.append("no {}".format(debug_log))
1468 if debug_config:
1469 debug_config_dict[router] = debug_config
1470
1471 result = create_common_configurations(
1472 tgen, debug_config_dict, "debug_log_config", build=build
1473 )
1474 except InvalidCLIError:
1475 # Traceback
1476 errormsg = traceback.format_exc()
1477 logger.error(errormsg)
1478 return errormsg
1479
1480 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1481 return result
1482
1483
1484 #############################################
1485 # Common APIs, will be used by all protocols
1486 #############################################
1487
1488
1489 def create_vrf_cfg(tgen, topo, input_dict=None, build=False):
1490 """
1491 Create vrf configuration for created topology. VRF
1492 configuration is provided in input json file.
1493
1494 VRF config is done in Linux Kernel:
1495 * Create VRF
1496 * Attach interface to VRF
1497 * Bring up VRF
1498
1499 Parameters
1500 ----------
1501 * `tgen` : Topogen object
1502 * `topo` : json file data
1503 * `input_dict` : Input dict data, required when configuring
1504 from testcase
1505 * `build` : Only for initial setup phase this is set as True.
1506
1507 Usage
1508 -----
1509 input_dict={
1510 "r3": {
1511 "links": {
1512 "r2-link1": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_A"},
1513 "r2-link2": {"ipv4": "auto", "ipv6": "auto", "vrf": "RED_B"},
1514 "r2-link3": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_A"},
1515 "r2-link4": {"ipv4": "auto", "ipv6": "auto", "vrf": "BLUE_B"},
1516 },
1517 "vrfs":[
1518 {
1519 "name": "RED_A",
1520 "id": "1"
1521 },
1522 {
1523 "name": "RED_B",
1524 "id": "2"
1525 },
1526 {
1527 "name": "BLUE_A",
1528 "id": "3",
1529 "delete": True
1530 },
1531 {
1532 "name": "BLUE_B",
1533 "id": "4"
1534 }
1535 ]
1536 }
1537 }
1538 result = create_vrf_cfg(tgen, topo, input_dict)
1539
1540 Returns
1541 -------
1542 True or False
1543 """
1544 result = True
1545 if not input_dict:
1546 input_dict = deepcopy(topo)
1547 else:
1548 input_dict = deepcopy(input_dict)
1549
1550 try:
1551 config_data_dict = {}
1552
1553 for c_router, c_data in input_dict.items():
1554 rnode = tgen.gears[c_router]
1555 config_data = []
1556 if "vrfs" in c_data:
1557 for vrf in c_data["vrfs"]:
1558 name = vrf.setdefault("name", None)
1559 table_id = vrf.setdefault("id", None)
1560 del_action = vrf.setdefault("delete", False)
1561
1562 if del_action:
1563 # Kernel cmd- Add VRF and table
1564 cmd = "ip link del {} type vrf table {}".format(
1565 vrf["name"], vrf["id"]
1566 )
1567
1568 logger.debug(
1569 "[DUT: %s]: Running kernel cmd [%s]", c_router, cmd
1570 )
1571 rnode.run(cmd)
1572
1573 # Kernel cmd - Bring down VRF
1574 cmd = "ip link set dev {} down".format(name)
1575 logger.debug(
1576 "[DUT: %s]: Running kernel cmd [%s]", c_router, cmd
1577 )
1578 rnode.run(cmd)
1579
1580 else:
1581 if name and table_id:
1582 # Kernel cmd- Add VRF and table
1583 cmd = "ip link add {} type vrf table {}".format(
1584 name, table_id
1585 )
1586 logger.debug(
1587 "[DUT: %s]: Running kernel cmd " "[%s]", c_router, cmd
1588 )
1589 rnode.run(cmd)
1590
1591 # Kernel cmd - Bring up VRF
1592 cmd = "ip link set dev {} up".format(name)
1593 logger.debug(
1594 "[DUT: %s]: Running kernel " "cmd [%s]", c_router, cmd
1595 )
1596 rnode.run(cmd)
1597
1598 for vrf in c_data["vrfs"]:
1599 vni = vrf.setdefault("vni", None)
1600 del_vni = vrf.setdefault("no_vni", None)
1601
1602 if "links" in c_data:
1603 for destRouterLink, data in sorted(c_data["links"].items()):
1604 # Loopback interfaces
1605 if "type" in data and data["type"] == "loopback":
1606 interface_name = destRouterLink
1607 else:
1608 interface_name = data["interface"]
1609
1610 if "vrf" in data:
1611 vrf_list = data["vrf"]
1612
1613 if type(vrf_list) is not list:
1614 vrf_list = [vrf_list]
1615
1616 for _vrf in vrf_list:
1617 cmd = "ip link set {} master {}".format(
1618 interface_name, _vrf
1619 )
1620
1621 logger.debug(
1622 "[DUT: %s]: Running" " kernel cmd [%s]",
1623 c_router,
1624 cmd,
1625 )
1626 rnode.run(cmd)
1627
1628 if vni:
1629 config_data.append("vrf {}".format(vrf["name"]))
1630 cmd = "vni {}".format(vni)
1631 config_data.append(cmd)
1632
1633 if del_vni:
1634 config_data.append("vrf {}".format(vrf["name"]))
1635 cmd = "no vni {}".format(del_vni)
1636 config_data.append(cmd)
1637
1638 if config_data:
1639 config_data_dict[c_router] = config_data
1640
1641 result = create_common_configurations(
1642 tgen, config_data_dict, "vrf", build=build
1643 )
1644
1645 except InvalidCLIError:
1646 # Traceback
1647 errormsg = traceback.format_exc()
1648 logger.error(errormsg)
1649 return errormsg
1650
1651 return result
1652
1653
1654 def create_interface_in_kernel(
1655 tgen, dut, name, ip_addr, vrf=None, netmask=None, create=True
1656 ):
1657 """
1658 Cretae interfaces in kernel for ipv4/ipv6
1659 Config is done in Linux Kernel:
1660
1661 Parameters
1662 ----------
1663 * `tgen` : Topogen object
1664 * `dut` : Device for which interfaces to be added
1665 * `name` : interface name
1666 * `ip_addr` : ip address for interface
1667 * `vrf` : VRF name, to which interface will be associated
1668 * `netmask` : netmask value, default is None
1669 * `create`: Create interface in kernel, if created then no need
1670 to create
1671 """
1672
1673 rnode = tgen.gears[dut]
1674
1675 if create:
1676 cmd = "ip link show {0} >/dev/null || ip link add {0} type dummy".format(name)
1677 rnode.run(cmd)
1678
1679 if not netmask:
1680 ifaddr = ipaddress.ip_interface(frr_unicode(ip_addr))
1681 else:
1682 ifaddr = ipaddress.ip_interface(
1683 "{}/{}".format(frr_unicode(ip_addr), frr_unicode(netmask))
1684 )
1685 cmd = "ip -{0} a flush {1} scope global && ip a add {2} dev {1} && ip l set {1} up".format(
1686 ifaddr.version, name, ifaddr
1687 )
1688 logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
1689 rnode.run(cmd)
1690
1691 if vrf:
1692 cmd = "ip link set {} master {}".format(name, vrf)
1693 rnode.run(cmd)
1694
1695
1696 def shutdown_bringup_interface_in_kernel(tgen, dut, intf_name, ifaceaction=False):
1697 """
1698 Cretae interfaces in kernel for ipv4/ipv6
1699 Config is done in Linux Kernel:
1700
1701 Parameters
1702 ----------
1703 * `tgen` : Topogen object
1704 * `dut` : Device for which interfaces to be added
1705 * `intf_name` : interface name
1706 * `ifaceaction` : False to shutdown and True to bringup the
1707 ineterface
1708 """
1709
1710 rnode = tgen.gears[dut]
1711
1712 cmd = "ip link set dev"
1713 if ifaceaction:
1714 action = "up"
1715 cmd = "{} {} {}".format(cmd, intf_name, action)
1716 else:
1717 action = "down"
1718 cmd = "{} {} {}".format(cmd, intf_name, action)
1719
1720 logger.debug("[DUT: %s]: Running command: %s", dut, cmd)
1721 rnode.run(cmd)
1722
1723
1724 def validate_ip_address(ip_address):
1725 """
1726 Validates the type of ip address
1727 Parameters
1728 ----------
1729 * `ip_address`: IPv4/IPv6 address
1730 Returns
1731 -------
1732 Type of address as string
1733 """
1734
1735 if "/" in ip_address:
1736 ip_address = ip_address.split("/")[0]
1737
1738 v4 = True
1739 v6 = True
1740 try:
1741 socket.inet_aton(ip_address)
1742 except socket.error as error:
1743 logger.debug("Not a valid IPv4 address")
1744 v4 = False
1745 else:
1746 return "ipv4"
1747
1748 try:
1749 socket.inet_pton(socket.AF_INET6, ip_address)
1750 except socket.error as error:
1751 logger.debug("Not a valid IPv6 address")
1752 v6 = False
1753 else:
1754 return "ipv6"
1755
1756 if not v4 and not v6:
1757 raise Exception(
1758 "InvalidIpAddr", "%s is neither valid IPv4 or IPv6" " address" % ip_address
1759 )
1760
1761
1762 def check_address_types(addr_type=None):
1763 """
1764 Checks environment variable set and compares with the current address type
1765 """
1766
1767 addr_types_env = os.environ.get("ADDRESS_TYPES")
1768 if not addr_types_env:
1769 addr_types_env = "dual"
1770
1771 if addr_types_env == "dual":
1772 addr_types = ["ipv4", "ipv6"]
1773 elif addr_types_env == "ipv4":
1774 addr_types = ["ipv4"]
1775 elif addr_types_env == "ipv6":
1776 addr_types = ["ipv6"]
1777
1778 if addr_type is None:
1779 return addr_types
1780
1781 if addr_type not in addr_types:
1782 logger.debug(
1783 "{} not in supported/configured address types {}".format(
1784 addr_type, addr_types
1785 )
1786 )
1787 return False
1788
1789 return True
1790
1791
1792 def generate_ips(network, no_of_ips):
1793 """
1794 Returns list of IPs.
1795 based on start_ip and no_of_ips
1796
1797 * `network` : from here the ip will start generating,
1798 start_ip will be
1799 * `no_of_ips` : these many IPs will be generated
1800 """
1801 ipaddress_list = []
1802 if type(network) is not list:
1803 network = [network]
1804
1805 for start_ipaddr in network:
1806 if "/" in start_ipaddr:
1807 start_ip = start_ipaddr.split("/")[0]
1808 mask = int(start_ipaddr.split("/")[1])
1809 else:
1810 logger.debug("start_ipaddr {} must have a / in it".format(start_ipaddr))
1811 assert 0
1812
1813 addr_type = validate_ip_address(start_ip)
1814 if addr_type == "ipv4":
1815 if start_ip == "0.0.0.0" and mask == 0 and no_of_ips == 1:
1816 ipaddress_list.append("{}/{}".format(start_ip, mask))
1817 return ipaddress_list
1818 start_ip = ipaddress.IPv4Address(frr_unicode(start_ip))
1819 step = 2 ** (32 - mask)
1820 elif addr_type == "ipv6":
1821 if start_ip == "0::0" and mask == 0 and no_of_ips == 1:
1822 ipaddress_list.append("{}/{}".format(start_ip, mask))
1823 return ipaddress_list
1824 start_ip = ipaddress.IPv6Address(frr_unicode(start_ip))
1825 step = 2 ** (128 - mask)
1826 else:
1827 return []
1828
1829 next_ip = start_ip
1830 count = 0
1831 while count < no_of_ips:
1832 ipaddress_list.append("{}/{}".format(next_ip, mask))
1833 if addr_type == "ipv6":
1834 next_ip = ipaddress.IPv6Address(int(next_ip) + step)
1835 else:
1836 next_ip += step
1837 count += 1
1838
1839 return ipaddress_list
1840
1841
1842 def find_interface_with_greater_ip(topo, router, loopback=True, interface=True):
1843 """
1844 Returns highest interface ip for ipv4/ipv6. If loopback is there then
1845 it will return highest IP from loopback IPs otherwise from physical
1846 interface IPs.
1847 * `topo` : json file data
1848 * `router` : router for which highest interface should be calculated
1849 """
1850
1851 link_data = topo["routers"][router]["links"]
1852 lo_list = []
1853 interfaces_list = []
1854 lo_exists = False
1855 for destRouterLink, data in sorted(link_data.items()):
1856 if loopback:
1857 if "type" in data and data["type"] == "loopback":
1858 lo_exists = True
1859 ip_address = topo["routers"][router]["links"][destRouterLink][
1860 "ipv4"
1861 ].split("/")[0]
1862 lo_list.append(ip_address)
1863 if interface:
1864 ip_address = topo["routers"][router]["links"][destRouterLink]["ipv4"].split(
1865 "/"
1866 )[0]
1867 interfaces_list.append(ip_address)
1868
1869 if lo_exists:
1870 return sorted(lo_list)[-1]
1871
1872 return sorted(interfaces_list)[-1]
1873
1874
1875 def write_test_header(tc_name):
1876 """Display message at beginning of test case"""
1877 count = 20
1878 logger.info("*" * (len(tc_name) + count))
1879 step("START -> Testcase : %s" % tc_name, reset=True)
1880 logger.info("*" * (len(tc_name) + count))
1881
1882
1883 def write_test_footer(tc_name):
1884 """Display message at end of test case"""
1885 count = 21
1886 logger.info("=" * (len(tc_name) + count))
1887 logger.info("Testcase : %s -> PASSED", tc_name)
1888 logger.info("=" * (len(tc_name) + count))
1889
1890
1891 def interface_status(tgen, topo, input_dict):
1892 """
1893 Delete ip route maps from device
1894 * `tgen` : Topogen object
1895 * `topo` : json file data
1896 * `input_dict` : for which router, route map has to be deleted
1897 Usage
1898 -----
1899 input_dict = {
1900 "r3": {
1901 "interface_list": ['eth1-r1-r2', 'eth2-r1-r3'],
1902 "status": "down"
1903 }
1904 }
1905 Returns
1906 -------
1907 errormsg(str) or True
1908 """
1909 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
1910
1911 try:
1912 rlist = []
1913
1914 for router in input_dict.keys():
1915 interface_list = input_dict[router]["interface_list"]
1916 status = input_dict[router].setdefault("status", "up")
1917 for intf in interface_list:
1918 rnode = tgen.gears[router]
1919 interface_set_status(rnode, intf, status)
1920
1921 rlist.append(router)
1922
1923 # Load config to routers
1924 load_config_to_routers(tgen, rlist)
1925
1926 except Exception as e:
1927 errormsg = traceback.format_exc()
1928 logger.error(errormsg)
1929 return errormsg
1930
1931 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
1932 return True
1933
1934
1935 def retry(retry_timeout, initial_wait=0, expected=True, diag_pct=0.75):
1936 """
1937 Fixture: Retries function while it's return value is an errormsg (str), False, or it raises an exception.
1938
1939 * `retry_timeout`: Retry for at least this many seconds; after waiting initial_wait seconds
1940 * `initial_wait`: Sleeps for this many seconds before first executing function
1941 * `expected`: if False then the return logic is inverted, except for exceptions,
1942 (i.e., a False or errmsg (str) function return ends the retry loop,
1943 and returns that False or str value)
1944 * `diag_pct`: Percentage of `retry_timeout` to keep testing after negative result would have
1945 been returned in order to see if a positive result comes after. This is an
1946 important diagnostic tool, and normally should not be disabled. Calls to wrapped
1947 functions though, can override the `diag_pct` value to make it larger in case more
1948 diagnostic retrying is appropriate.
1949 """
1950
1951 def _retry(func):
1952 @wraps(func)
1953 def func_retry(*args, **kwargs):
1954 # We will continue to retry diag_pct of the timeout value to see if test would have passed with a
1955 # longer retry timeout value.
1956 saved_failure = None
1957
1958 retry_sleep = 2
1959
1960 # Allow the wrapped function's args to override the fixtures
1961 _retry_timeout = kwargs.pop("retry_timeout", retry_timeout)
1962 _expected = kwargs.pop("expected", expected)
1963 _initial_wait = kwargs.pop("initial_wait", initial_wait)
1964 _diag_pct = kwargs.pop("diag_pct", diag_pct)
1965
1966 start_time = datetime.now()
1967 retry_until = datetime.now() + timedelta(
1968 seconds=_retry_timeout + _initial_wait
1969 )
1970
1971 if initial_wait > 0:
1972 logger.debug("Waiting for [%s]s as initial delay", initial_wait)
1973 sleep(initial_wait)
1974
1975 invert_logic = not _expected
1976 while True:
1977 seconds_left = (retry_until - datetime.now()).total_seconds()
1978 try:
1979 ret = func(*args, **kwargs)
1980 logger.debug("Function returned %s", ret)
1981
1982 negative_result = ret is False or is_string(ret)
1983 if negative_result == invert_logic:
1984 # Simple case, successful result in time
1985 if not saved_failure:
1986 return ret
1987
1988 # Positive result, but happened after timeout failure, very important to
1989 # note for fixing tests.
1990 logger.warning(
1991 "RETRY DIAGNOSTIC: SUCCEED after FAILED with requested timeout of %.1fs; however, succeeded in %.1fs, investigate timeout timing",
1992 _retry_timeout,
1993 (datetime.now() - start_time).total_seconds(),
1994 )
1995 if isinstance(saved_failure, Exception):
1996 raise saved_failure # pylint: disable=E0702
1997 return saved_failure
1998
1999 except Exception as error:
2000 logger.info("Function raised exception: %s", str(error))
2001 ret = error
2002
2003 if seconds_left < 0 and saved_failure:
2004 logger.info(
2005 "RETRY DIAGNOSTIC: Retry timeout reached, still failing"
2006 )
2007 if isinstance(saved_failure, Exception):
2008 raise saved_failure # pylint: disable=E0702
2009 return saved_failure
2010
2011 if seconds_left < 0:
2012 logger.info("Retry timeout of %ds reached", _retry_timeout)
2013
2014 saved_failure = ret
2015 retry_extra_delta = timedelta(
2016 seconds=seconds_left + _retry_timeout * _diag_pct
2017 )
2018 retry_until = datetime.now() + retry_extra_delta
2019 seconds_left = retry_extra_delta.total_seconds()
2020
2021 # Generate bundle after setting remaining diagnostic retry time
2022 generate_support_bundle()
2023
2024 # If user has disabled diagnostic retries return now
2025 if not _diag_pct:
2026 if isinstance(saved_failure, Exception):
2027 raise saved_failure
2028 return saved_failure
2029
2030 if saved_failure:
2031 logger.debug(
2032 "RETRY DIAG: [failure] Sleeping %ds until next retry with %.1f retry time left - too see if timeout was too short",
2033 retry_sleep,
2034 seconds_left,
2035 )
2036 else:
2037 logger.debug(
2038 "Sleeping %ds until next retry with %.1f retry time left",
2039 retry_sleep,
2040 seconds_left,
2041 )
2042 sleep(retry_sleep)
2043
2044 func_retry._original = func
2045 return func_retry
2046
2047 return _retry
2048
2049
2050 class Stepper:
2051 """
2052 Prints step number for the test case step being executed
2053 """
2054
2055 count = 1
2056
2057 def __call__(self, msg, reset):
2058 if reset:
2059 Stepper.count = 1
2060 logger.info(msg)
2061 else:
2062 logger.info("STEP %s: '%s'", Stepper.count, msg)
2063 Stepper.count += 1
2064
2065
2066 def step(msg, reset=False):
2067 """
2068 Call Stepper to print test steps. Need to reset at the beginning of test.
2069 * ` msg` : Step message body.
2070 * `reset` : Reset step count to 1 when set to True.
2071 """
2072 _step = Stepper()
2073 _step(msg, reset)
2074
2075
2076 def do_countdown(secs):
2077 """
2078 Countdown timer display
2079 """
2080 for i in range(secs, 0, -1):
2081 sys.stdout.write("{} ".format(str(i)))
2082 sys.stdout.flush()
2083 sleep(1)
2084 return
2085
2086
2087 #############################################
2088 # These APIs, will used by testcase
2089 #############################################
2090 def create_interfaces_cfg(tgen, topo, build=False):
2091 """
2092 Create interface configuration for created topology. Basic Interface
2093 configuration is provided in input json file.
2094
2095 Parameters
2096 ----------
2097 * `tgen` : Topogen object
2098 * `topo` : json file data
2099 * `build` : Only for initial setup phase this is set as True.
2100
2101 Returns
2102 -------
2103 True or False
2104 """
2105
2106 def _create_interfaces_ospf_cfg(ospf, c_data, data, ospf_keywords):
2107 interface_data = []
2108 ip_ospf = "ipv6 ospf6" if ospf == "ospf6" else "ip ospf"
2109 for keyword in ospf_keywords:
2110 if keyword in data[ospf]:
2111 intf_ospf_value = c_data["links"][destRouterLink][ospf][keyword]
2112 if "delete" in data and data["delete"]:
2113 interface_data.append(
2114 "no {} {}".format(ip_ospf, keyword.replace("_", "-"))
2115 )
2116 else:
2117 interface_data.append(
2118 "{} {} {}".format(
2119 ip_ospf, keyword.replace("_", "-"), intf_ospf_value
2120 )
2121 )
2122 return interface_data
2123
2124 result = False
2125 topo = deepcopy(topo)
2126
2127 try:
2128 interface_data_dict = {}
2129
2130 for c_router, c_data in topo.items():
2131 interface_data = []
2132 for destRouterLink, data in sorted(c_data["links"].items()):
2133 # Loopback interfaces
2134 if "type" in data and data["type"] == "loopback":
2135 interface_name = destRouterLink
2136 else:
2137 interface_name = data["interface"]
2138
2139 interface_data.append("interface {}".format(str(interface_name)))
2140
2141 if "ipv4" in data:
2142 intf_addr = c_data["links"][destRouterLink]["ipv4"]
2143
2144 if "delete" in data and data["delete"]:
2145 interface_data.append("no ip address {}".format(intf_addr))
2146 else:
2147 interface_data.append("ip address {}".format(intf_addr))
2148 if "ipv6" in data:
2149 intf_addr = c_data["links"][destRouterLink]["ipv6"]
2150
2151 if "delete" in data and data["delete"]:
2152 interface_data.append("no ipv6 address {}".format(intf_addr))
2153 else:
2154 interface_data.append("ipv6 address {}".format(intf_addr))
2155
2156 # Wait for vrf interfaces to get link local address once they are up
2157 if (
2158 not destRouterLink == "lo"
2159 and "vrf" in topo[c_router]["links"][destRouterLink]
2160 ):
2161 vrf = topo[c_router]["links"][destRouterLink]["vrf"]
2162 intf = topo[c_router]["links"][destRouterLink]["interface"]
2163 ll = get_frr_ipv6_linklocal(tgen, c_router, intf=intf, vrf=vrf)
2164
2165 if "ipv6-link-local" in data:
2166 intf_addr = c_data["links"][destRouterLink]["ipv6-link-local"]
2167
2168 if "delete" in data and data["delete"]:
2169 interface_data.append("no ipv6 address {}".format(intf_addr))
2170 else:
2171 interface_data.append("ipv6 address {}\n".format(intf_addr))
2172
2173 ospf_keywords = [
2174 "hello_interval",
2175 "dead_interval",
2176 "network",
2177 "priority",
2178 "cost",
2179 "mtu_ignore",
2180 ]
2181 if "ospf" in data:
2182 interface_data += _create_interfaces_ospf_cfg(
2183 "ospf", c_data, data, ospf_keywords + ["area"]
2184 )
2185 if "ospf6" in data:
2186 interface_data += _create_interfaces_ospf_cfg(
2187 "ospf6", c_data, data, ospf_keywords + ["area"]
2188 )
2189 interface_data.append("exit")
2190 if interface_data:
2191 interface_data_dict[c_router] = interface_data
2192
2193 result = create_common_configurations(
2194 tgen, interface_data_dict, "interface_config", build=build
2195 )
2196
2197 except InvalidCLIError:
2198 # Traceback
2199 errormsg = traceback.format_exc()
2200 logger.error(errormsg)
2201 return errormsg
2202
2203 return result
2204
2205
2206 def create_static_routes(tgen, input_dict, build=False):
2207 """
2208 Create static routes for given router as defined in input_dict
2209
2210 Parameters
2211 ----------
2212 * `tgen` : Topogen object
2213 * `input_dict` : Input dict data, required when configuring from testcase
2214 * `build` : Only for initial setup phase this is set as True.
2215
2216 Usage
2217 -----
2218 input_dict should be in the format below:
2219 # static_routes: list of all routes
2220 # network: network address
2221 # no_of_ip: number of next-hop address that will be configured
2222 # admin_distance: admin distance for route/routes.
2223 # next_hop: starting next-hop address
2224 # tag: tag id for static routes
2225 # vrf: VRF name in which static routes needs to be created
2226 # delete: True if config to be removed. Default False.
2227
2228 Example:
2229 "routers": {
2230 "r1": {
2231 "static_routes": [
2232 {
2233 "network": "100.0.20.1/32",
2234 "no_of_ip": 9,
2235 "admin_distance": 100,
2236 "next_hop": "10.0.0.1",
2237 "tag": 4001,
2238 "vrf": "RED_A"
2239 "delete": true
2240 }
2241 ]
2242 }
2243 }
2244
2245 Returns
2246 -------
2247 errormsg(str) or True
2248 """
2249 result = False
2250 logger.debug("Entering lib API: create_static_routes()")
2251 input_dict = deepcopy(input_dict)
2252
2253 try:
2254 static_routes_list_dict = {}
2255
2256 for router in input_dict.keys():
2257 if "static_routes" not in input_dict[router]:
2258 errormsg = "static_routes not present in input_dict"
2259 logger.info(errormsg)
2260 continue
2261
2262 static_routes_list = []
2263
2264 static_routes = input_dict[router]["static_routes"]
2265 for static_route in static_routes:
2266 del_action = static_route.setdefault("delete", False)
2267 no_of_ip = static_route.setdefault("no_of_ip", 1)
2268 network = static_route.setdefault("network", [])
2269 if type(network) is not list:
2270 network = [network]
2271
2272 admin_distance = static_route.setdefault("admin_distance", None)
2273 tag = static_route.setdefault("tag", None)
2274 vrf = static_route.setdefault("vrf", None)
2275 interface = static_route.setdefault("interface", None)
2276 next_hop = static_route.setdefault("next_hop", None)
2277 nexthop_vrf = static_route.setdefault("nexthop_vrf", None)
2278
2279 ip_list = generate_ips(network, no_of_ip)
2280 for ip in ip_list:
2281 addr_type = validate_ip_address(ip)
2282
2283 if addr_type == "ipv4":
2284 cmd = "ip route {}".format(ip)
2285 else:
2286 cmd = "ipv6 route {}".format(ip)
2287
2288 if interface:
2289 cmd = "{} {}".format(cmd, interface)
2290
2291 if next_hop:
2292 cmd = "{} {}".format(cmd, next_hop)
2293
2294 if nexthop_vrf:
2295 cmd = "{} nexthop-vrf {}".format(cmd, nexthop_vrf)
2296
2297 if vrf:
2298 cmd = "{} vrf {}".format(cmd, vrf)
2299
2300 if tag:
2301 cmd = "{} tag {}".format(cmd, str(tag))
2302
2303 if admin_distance:
2304 cmd = "{} {}".format(cmd, admin_distance)
2305
2306 if del_action:
2307 cmd = "no {}".format(cmd)
2308
2309 static_routes_list.append(cmd)
2310
2311 if static_routes_list:
2312 static_routes_list_dict[router] = static_routes_list
2313
2314 result = create_common_configurations(
2315 tgen, static_routes_list_dict, "static_route", build=build
2316 )
2317
2318 except InvalidCLIError:
2319 # Traceback
2320 errormsg = traceback.format_exc()
2321 logger.error(errormsg)
2322 return errormsg
2323
2324 logger.debug("Exiting lib API: create_static_routes()")
2325 return result
2326
2327
2328 def create_prefix_lists(tgen, input_dict, build=False):
2329 """
2330 Create ip prefix lists as per the config provided in input
2331 JSON or input_dict
2332 Parameters
2333 ----------
2334 * `tgen` : Topogen object
2335 * `input_dict` : Input dict data, required when configuring from testcase
2336 * `build` : Only for initial setup phase this is set as True.
2337 Usage
2338 -----
2339 # pf_lists_1: name of prefix-list, user defined
2340 # seqid: prefix-list seqid, auto-generated if not given by user
2341 # network: criteria for applying prefix-list
2342 # action: permit/deny
2343 # le: less than or equal number of bits
2344 # ge: greater than or equal number of bits
2345 Example
2346 -------
2347 input_dict = {
2348 "r1": {
2349 "prefix_lists":{
2350 "ipv4": {
2351 "pf_list_1": [
2352 {
2353 "seqid": 10,
2354 "network": "any",
2355 "action": "permit",
2356 "le": "32",
2357 "ge": "30",
2358 "delete": True
2359 }
2360 ]
2361 }
2362 }
2363 }
2364 }
2365 Returns
2366 -------
2367 errormsg or True
2368 """
2369
2370 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2371 result = False
2372 try:
2373 config_data_dict = {}
2374
2375 for router in input_dict.keys():
2376 if "prefix_lists" not in input_dict[router]:
2377 errormsg = "prefix_lists not present in input_dict"
2378 logger.debug(errormsg)
2379 continue
2380
2381 config_data = []
2382 prefix_lists = input_dict[router]["prefix_lists"]
2383 for addr_type, prefix_data in prefix_lists.items():
2384 if not check_address_types(addr_type):
2385 continue
2386
2387 for prefix_name, prefix_list in prefix_data.items():
2388 for prefix_dict in prefix_list:
2389 if "action" not in prefix_dict or "network" not in prefix_dict:
2390 errormsg = "'action' or network' missing in" " input_dict"
2391 return errormsg
2392
2393 network_addr = prefix_dict["network"]
2394 action = prefix_dict["action"]
2395 le = prefix_dict.setdefault("le", None)
2396 ge = prefix_dict.setdefault("ge", None)
2397 seqid = prefix_dict.setdefault("seqid", None)
2398 del_action = prefix_dict.setdefault("delete", False)
2399 if seqid is None:
2400 seqid = get_seq_id("prefix_lists", router, prefix_name)
2401 else:
2402 set_seq_id("prefix_lists", router, seqid, prefix_name)
2403
2404 if addr_type == "ipv4":
2405 protocol = "ip"
2406 else:
2407 protocol = "ipv6"
2408
2409 cmd = "{} prefix-list {} seq {} {} {}".format(
2410 protocol, prefix_name, seqid, action, network_addr
2411 )
2412 if le:
2413 cmd = "{} le {}".format(cmd, le)
2414 if ge:
2415 cmd = "{} ge {}".format(cmd, ge)
2416
2417 if del_action:
2418 cmd = "no {}".format(cmd)
2419
2420 config_data.append(cmd)
2421 if config_data:
2422 config_data_dict[router] = config_data
2423
2424 result = create_common_configurations(
2425 tgen, config_data_dict, "prefix_list", build=build
2426 )
2427
2428 except InvalidCLIError:
2429 # Traceback
2430 errormsg = traceback.format_exc()
2431 logger.error(errormsg)
2432 return errormsg
2433
2434 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2435 return result
2436
2437
2438 def create_route_maps(tgen, input_dict, build=False):
2439 """
2440 Create route-map on the devices as per the arguments passed
2441 Parameters
2442 ----------
2443 * `tgen` : Topogen object
2444 * `input_dict` : Input dict data, required when configuring from testcase
2445 * `build` : Only for initial setup phase this is set as True.
2446 Usage
2447 -----
2448 # route_maps: key, value pair for route-map name and its attribute
2449 # rmap_match_prefix_list_1: user given name for route-map
2450 # action: PERMIT/DENY
2451 # match: key,value pair for match criteria. prefix_list, community-list,
2452 large-community-list or tag. Only one option at a time.
2453 # prefix_list: name of prefix list
2454 # large-community-list: name of large community list
2455 # community-ist: name of community list
2456 # tag: tag id for static routes
2457 # set: key, value pair for modifying route attributes
2458 # localpref: preference value for the network
2459 # med: metric value advertised for AS
2460 # aspath: set AS path value
2461 # weight: weight for the route
2462 # community: standard community value to be attached
2463 # large_community: large community value to be attached
2464 # community_additive: if set to "additive", adds community/large-community
2465 value to the existing values of the network prefix
2466 Example:
2467 --------
2468 input_dict = {
2469 "r1": {
2470 "route_maps": {
2471 "rmap_match_prefix_list_1": [
2472 {
2473 "action": "PERMIT",
2474 "match": {
2475 "ipv4": {
2476 "prefix_list": "pf_list_1"
2477 }
2478 "ipv6": {
2479 "prefix_list": "pf_list_1"
2480 }
2481 "large-community-list": {
2482 "id": "community_1",
2483 "exact_match": True
2484 }
2485 "community_list": {
2486 "id": "community_2",
2487 "exact_match": True
2488 }
2489 "tag": "tag_id"
2490 },
2491 "set": {
2492 "locPrf": 150,
2493 "metric": 30,
2494 "path": {
2495 "num": 20000,
2496 "action": "prepend",
2497 },
2498 "weight": 500,
2499 "community": {
2500 "num": "1:2 2:3",
2501 "action": additive
2502 }
2503 "large_community": {
2504 "num": "1:2:3 4:5;6",
2505 "action": additive
2506 },
2507 }
2508 }
2509 ]
2510 }
2511 }
2512 }
2513 Returns
2514 -------
2515 errormsg(str) or True
2516 """
2517
2518 result = False
2519 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2520 input_dict = deepcopy(input_dict)
2521 try:
2522 rmap_data_dict = {}
2523
2524 for router in input_dict.keys():
2525 if "route_maps" not in input_dict[router]:
2526 logger.debug("route_maps not present in input_dict")
2527 continue
2528 rmap_data = []
2529 for rmap_name, rmap_value in input_dict[router]["route_maps"].items():
2530 for rmap_dict in rmap_value:
2531 del_action = rmap_dict.setdefault("delete", False)
2532
2533 if del_action:
2534 rmap_data.append("no route-map {}".format(rmap_name))
2535 continue
2536
2537 if "action" not in rmap_dict:
2538 errormsg = "action not present in input_dict"
2539 logger.error(errormsg)
2540 return False
2541
2542 rmap_action = rmap_dict.setdefault("action", "deny")
2543
2544 seq_id = rmap_dict.setdefault("seq_id", None)
2545 if seq_id is None:
2546 seq_id = get_seq_id("route_maps", router, rmap_name)
2547 else:
2548 set_seq_id("route_maps", router, seq_id, rmap_name)
2549
2550 rmap_data.append(
2551 "route-map {} {} {}".format(rmap_name, rmap_action, seq_id)
2552 )
2553
2554 if "continue" in rmap_dict:
2555 continue_to = rmap_dict["continue"]
2556 if continue_to:
2557 rmap_data.append("on-match goto {}".format(continue_to))
2558 else:
2559 logger.error(
2560 "In continue, 'route-map entry "
2561 "sequence number' is not provided"
2562 )
2563 return False
2564
2565 if "goto" in rmap_dict:
2566 go_to = rmap_dict["goto"]
2567 if go_to:
2568 rmap_data.append("on-match goto {}".format(go_to))
2569 else:
2570 logger.error(
2571 "In goto, 'Goto Clause number' is not" " provided"
2572 )
2573 return False
2574
2575 if "call" in rmap_dict:
2576 call_rmap = rmap_dict["call"]
2577 if call_rmap:
2578 rmap_data.append("call {}".format(call_rmap))
2579 else:
2580 logger.error(
2581 "In call, 'destination Route-Map' is" " not provided"
2582 )
2583 return False
2584
2585 # Verifying if SET criteria is defined
2586 if "set" in rmap_dict:
2587 set_data = rmap_dict["set"]
2588 ipv4_data = set_data.setdefault("ipv4", {})
2589 ipv6_data = set_data.setdefault("ipv6", {})
2590 local_preference = set_data.setdefault("locPrf", None)
2591 metric = set_data.setdefault("metric", None)
2592 metric_type = set_data.setdefault("metric-type", None)
2593 as_path = set_data.setdefault("path", {})
2594 weight = set_data.setdefault("weight", None)
2595 community = set_data.setdefault("community", {})
2596 large_community = set_data.setdefault("large_community", {})
2597 large_comm_list = set_data.setdefault("large_comm_list", {})
2598 set_action = set_data.setdefault("set_action", None)
2599 nexthop = set_data.setdefault("nexthop", None)
2600 origin = set_data.setdefault("origin", None)
2601 ext_comm_list = set_data.setdefault("extcommunity", {})
2602 metrictype = set_data.setdefault("metric-type", None)
2603
2604 # Local Preference
2605 if local_preference:
2606 rmap_data.append(
2607 "set local-preference {}".format(local_preference)
2608 )
2609
2610 # Metric-Type
2611 if metrictype:
2612 rmap_data.append("set metric-type {}\n".format(metrictype))
2613
2614 # Metric
2615 if metric:
2616 del_comm = set_data.setdefault("delete", None)
2617 if del_comm:
2618 rmap_data.append("no set metric {}".format(metric))
2619 else:
2620 rmap_data.append("set metric {}".format(metric))
2621
2622 # Origin
2623 if origin:
2624 rmap_data.append("set origin {} \n".format(origin))
2625
2626 # AS Path Prepend
2627 if as_path:
2628 as_num = as_path.setdefault("as_num", None)
2629 as_action = as_path.setdefault("as_action", None)
2630 if as_action and as_num:
2631 rmap_data.append(
2632 "set as-path {} {}".format(as_action, as_num)
2633 )
2634
2635 # Community
2636 if community:
2637 num = community.setdefault("num", None)
2638 comm_action = community.setdefault("action", None)
2639 if num:
2640 cmd = "set community {}".format(num)
2641 if comm_action:
2642 cmd = "{} {}".format(cmd, comm_action)
2643 rmap_data.append(cmd)
2644 else:
2645 logger.error("In community, AS Num not" " provided")
2646 return False
2647
2648 if large_community:
2649 num = large_community.setdefault("num", None)
2650 comm_action = large_community.setdefault("action", None)
2651 if num:
2652 cmd = "set large-community {}".format(num)
2653 if comm_action:
2654 cmd = "{} {}".format(cmd, comm_action)
2655
2656 rmap_data.append(cmd)
2657 else:
2658 logger.error(
2659 "In large_community, AS Num not" " provided"
2660 )
2661 return False
2662 if large_comm_list:
2663 id = large_comm_list.setdefault("id", None)
2664 del_comm = large_comm_list.setdefault("delete", None)
2665 if id:
2666 cmd = "set large-comm-list {}".format(id)
2667 if del_comm:
2668 cmd = "{} delete".format(cmd)
2669
2670 rmap_data.append(cmd)
2671 else:
2672 logger.error("In large_comm_list 'id' not" " provided")
2673 return False
2674
2675 if ext_comm_list:
2676 rt = ext_comm_list.setdefault("rt", None)
2677 del_comm = ext_comm_list.setdefault("delete", None)
2678 if rt:
2679 cmd = "set extcommunity rt {}".format(rt)
2680 if del_comm:
2681 cmd = "{} delete".format(cmd)
2682
2683 rmap_data.append(cmd)
2684 else:
2685 logger.debug("In ext_comm_list 'rt' not" " provided")
2686 return False
2687
2688 # Weight
2689 if weight:
2690 rmap_data.append("set weight {}".format(weight))
2691 if ipv6_data:
2692 nexthop = ipv6_data.setdefault("nexthop", None)
2693 if nexthop:
2694 rmap_data.append("set ipv6 next-hop {}".format(nexthop))
2695
2696 # Adding MATCH and SET sequence to RMAP if defined
2697 if "match" in rmap_dict:
2698 match_data = rmap_dict["match"]
2699 ipv4_data = match_data.setdefault("ipv4", {})
2700 ipv6_data = match_data.setdefault("ipv6", {})
2701 community = match_data.setdefault("community_list", {})
2702 large_community = match_data.setdefault("large_community", {})
2703 large_community_list = match_data.setdefault(
2704 "large_community_list", {}
2705 )
2706
2707 metric = match_data.setdefault("metric", None)
2708 source_vrf = match_data.setdefault("source-vrf", None)
2709
2710 if ipv4_data:
2711 # fetch prefix list data from rmap
2712 prefix_name = ipv4_data.setdefault("prefix_lists", None)
2713 if prefix_name:
2714 rmap_data.append(
2715 "match ip address"
2716 " prefix-list {}".format(prefix_name)
2717 )
2718
2719 # fetch tag data from rmap
2720 tag = ipv4_data.setdefault("tag", None)
2721 if tag:
2722 rmap_data.append("match tag {}".format(tag))
2723
2724 # fetch large community data from rmap
2725 large_community_list = ipv4_data.setdefault(
2726 "large_community_list", {}
2727 )
2728 large_community = match_data.setdefault(
2729 "large_community", {}
2730 )
2731
2732 if ipv6_data:
2733 prefix_name = ipv6_data.setdefault("prefix_lists", None)
2734 if prefix_name:
2735 rmap_data.append(
2736 "match ipv6 address"
2737 " prefix-list {}".format(prefix_name)
2738 )
2739
2740 # fetch tag data from rmap
2741 tag = ipv6_data.setdefault("tag", None)
2742 if tag:
2743 rmap_data.append("match tag {}".format(tag))
2744
2745 # fetch large community data from rmap
2746 large_community_list = ipv6_data.setdefault(
2747 "large_community_list", {}
2748 )
2749 large_community = match_data.setdefault(
2750 "large_community", {}
2751 )
2752
2753 if community:
2754 if "id" not in community:
2755 logger.error(
2756 "'id' is mandatory for "
2757 "community-list in match"
2758 " criteria"
2759 )
2760 return False
2761 cmd = "match community {}".format(community["id"])
2762 exact_match = community.setdefault("exact_match", False)
2763 if exact_match:
2764 cmd = "{} exact-match".format(cmd)
2765
2766 rmap_data.append(cmd)
2767 if large_community:
2768 if "id" not in large_community:
2769 logger.error(
2770 "'id' is mandatory for "
2771 "large-community-list in match "
2772 "criteria"
2773 )
2774 return False
2775 cmd = "match large-community {}".format(
2776 large_community["id"]
2777 )
2778 exact_match = large_community.setdefault(
2779 "exact_match", False
2780 )
2781 if exact_match:
2782 cmd = "{} exact-match".format(cmd)
2783 rmap_data.append(cmd)
2784 if large_community_list:
2785 if "id" not in large_community_list:
2786 logger.error(
2787 "'id' is mandatory for "
2788 "large-community-list in match "
2789 "criteria"
2790 )
2791 return False
2792 cmd = "match large-community {}".format(
2793 large_community_list["id"]
2794 )
2795 exact_match = large_community_list.setdefault(
2796 "exact_match", False
2797 )
2798 if exact_match:
2799 cmd = "{} exact-match".format(cmd)
2800 rmap_data.append(cmd)
2801
2802 if source_vrf:
2803 cmd = "match source-vrf {}".format(source_vrf)
2804 rmap_data.append(cmd)
2805
2806 if metric:
2807 cmd = "match metric {}".format(metric)
2808 rmap_data.append(cmd)
2809
2810 if rmap_data:
2811 rmap_data_dict[router] = rmap_data
2812
2813 result = create_common_configurations(
2814 tgen, rmap_data_dict, "route_maps", build=build
2815 )
2816
2817 except InvalidCLIError:
2818 # Traceback
2819 errormsg = traceback.format_exc()
2820 logger.error(errormsg)
2821 return errormsg
2822
2823 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2824 return result
2825
2826
2827 def delete_route_maps(tgen, input_dict):
2828 """
2829 Delete ip route maps from device
2830 * `tgen` : Topogen object
2831 * `input_dict` : for which router,
2832 route map has to be deleted
2833 Usage
2834 -----
2835 # Delete route-map rmap_1 and rmap_2 from router r1
2836 input_dict = {
2837 "r1": {
2838 "route_maps": ["rmap_1", "rmap__2"]
2839 }
2840 }
2841 result = delete_route_maps("ipv4", input_dict)
2842 Returns
2843 -------
2844 errormsg(str) or True
2845 """
2846 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2847
2848 for router in input_dict.keys():
2849 route_maps = input_dict[router]["route_maps"][:]
2850 rmap_data = input_dict[router]
2851 rmap_data["route_maps"] = {}
2852 for route_map_name in route_maps:
2853 rmap_data["route_maps"].update({route_map_name: [{"delete": True}]})
2854
2855 return create_route_maps(tgen, input_dict)
2856
2857
2858 def create_bgp_community_lists(tgen, input_dict, build=False):
2859 """
2860 Create bgp community-list or large-community-list on the devices as per
2861 the arguments passed. Takes list of communities in input.
2862 Parameters
2863 ----------
2864 * `tgen` : Topogen object
2865 * `input_dict` : Input dict data, required when configuring from testcase
2866 * `build` : Only for initial setup phase this is set as True.
2867 Usage
2868 -----
2869 input_dict_1 = {
2870 "r3": {
2871 "bgp_community_lists": [
2872 {
2873 "community_type": "standard",
2874 "action": "permit",
2875 "name": "rmap_lcomm_{}".format(addr_type),
2876 "value": "1:1:1 1:2:3 2:1:1 2:2:2",
2877 "large": True
2878 }
2879 ]
2880 }
2881 }
2882 }
2883 result = create_bgp_community_lists(tgen, input_dict_1)
2884 """
2885
2886 result = False
2887 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
2888 input_dict = deepcopy(input_dict)
2889 try:
2890 config_data_dict = {}
2891
2892 for router in input_dict.keys():
2893 if "bgp_community_lists" not in input_dict[router]:
2894 errormsg = "bgp_community_lists not present in input_dict"
2895 logger.debug(errormsg)
2896 continue
2897
2898 config_data = []
2899
2900 community_list = input_dict[router]["bgp_community_lists"]
2901 for community_dict in community_list:
2902 del_action = community_dict.setdefault("delete", False)
2903 community_type = community_dict.setdefault("community_type", None)
2904 action = community_dict.setdefault("action", None)
2905 value = community_dict.setdefault("value", "")
2906 large = community_dict.setdefault("large", None)
2907 name = community_dict.setdefault("name", None)
2908 if large:
2909 cmd = "bgp large-community-list"
2910 else:
2911 cmd = "bgp community-list"
2912
2913 if not large and not (community_type and action and value):
2914 errormsg = (
2915 "community_type, action and value are "
2916 "required in bgp_community_list"
2917 )
2918 logger.error(errormsg)
2919 return False
2920
2921 cmd = "{} {} {} {} {}".format(cmd, community_type, name, action, value)
2922
2923 if del_action:
2924 cmd = "no {}".format(cmd)
2925
2926 config_data.append(cmd)
2927
2928 if config_data:
2929 config_data_dict[router] = config_data
2930
2931 result = create_common_configurations(
2932 tgen, config_data_dict, "bgp_community_list", build=build
2933 )
2934
2935 except InvalidCLIError:
2936 # Traceback
2937 errormsg = traceback.format_exc()
2938 logger.error(errormsg)
2939 return errormsg
2940
2941 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
2942 return result
2943
2944
2945 def shutdown_bringup_interface(tgen, dut, intf_name, ifaceaction=False):
2946 """
2947 Shutdown or bringup router's interface "
2948 * `tgen` : Topogen object
2949 * `dut` : Device under test
2950 * `intf_name` : Interface name to be shut/no shut
2951 * `ifaceaction` : Action, to shut/no shut interface,
2952 by default is False
2953 Usage
2954 -----
2955 dut = "r3"
2956 intf = "r3-r1-eth0"
2957 # Shut down interface
2958 shutdown_bringup_interface(tgen, dut, intf, False)
2959 # Bring up interface
2960 shutdown_bringup_interface(tgen, dut, intf, True)
2961 Returns
2962 -------
2963 errormsg(str) or True
2964 """
2965
2966 router_list = tgen.routers()
2967 if ifaceaction:
2968 logger.info("Bringing up interface {} : {}".format(dut, intf_name))
2969 else:
2970 logger.info("Shutting down interface {} : {}".format(dut, intf_name))
2971
2972 interface_set_status(router_list[dut], intf_name, ifaceaction)
2973
2974
2975 def addKernelRoute(
2976 tgen, router, intf, group_addr_range, next_hop=None, src=None, del_action=None
2977 ):
2978 """
2979 Add route to kernel
2980
2981 Parameters:
2982 -----------
2983 * `tgen` : Topogen object
2984 * `router`: router for which kernel routes needs to be added
2985 * `intf`: interface name, for which kernel routes needs to be added
2986 * `bindToAddress`: bind to <host>, an interface or multicast
2987 address
2988
2989 returns:
2990 --------
2991 errormsg or True
2992 """
2993
2994 logger.debug("Entering lib API: addKernelRoute()")
2995
2996 rnode = tgen.gears[router]
2997
2998 if type(group_addr_range) is not list:
2999 group_addr_range = [group_addr_range]
3000
3001 for grp_addr in group_addr_range:
3002 addr_type = validate_ip_address(grp_addr)
3003 if addr_type == "ipv4":
3004 if next_hop is not None:
3005 cmd = "ip route add {} via {}".format(grp_addr, next_hop)
3006 else:
3007 cmd = "ip route add {} dev {}".format(grp_addr, intf)
3008 if del_action:
3009 cmd = "ip route del {}".format(grp_addr)
3010 verify_cmd = "ip route"
3011 elif addr_type == "ipv6":
3012 if intf and src:
3013 cmd = "ip -6 route add {} dev {} src {}".format(grp_addr, intf, src)
3014 else:
3015 cmd = "ip -6 route add {} via {}".format(grp_addr, next_hop)
3016 verify_cmd = "ip -6 route"
3017 if del_action:
3018 cmd = "ip -6 route del {}".format(grp_addr)
3019
3020 logger.info("[DUT: {}]: Running command: [{}]".format(router, cmd))
3021 output = rnode.run(cmd)
3022
3023 def check_in_kernel(rnode, verify_cmd, grp_addr, router):
3024 # Verifying if ip route added to kernel
3025 errormsg = None
3026 result = rnode.run(verify_cmd)
3027 logger.debug("{}\n{}".format(verify_cmd, result))
3028 if "/" in grp_addr:
3029 ip, mask = grp_addr.split("/")
3030 if mask == "32" or mask == "128":
3031 grp_addr = ip
3032 else:
3033 mask = "32" if addr_type == "ipv4" else "128"
3034
3035 if not re_search(r"{}".format(grp_addr), result) and mask != "0":
3036 errormsg = (
3037 "[DUT: {}]: Kernal route is not added for group"
3038 " address {} Config output: {}".format(
3039 router, grp_addr, output
3040 )
3041 )
3042
3043 return errormsg
3044
3045 test_func = functools.partial(
3046 check_in_kernel, rnode, verify_cmd, grp_addr, router
3047 )
3048 (result, out) = topotest.run_and_expect(test_func, None, count=20, wait=1)
3049 assert result, out
3050
3051 logger.debug("Exiting lib API: addKernelRoute()")
3052 return True
3053
3054
3055 def configure_vxlan(tgen, input_dict):
3056 """
3057 Add and configure vxlan
3058
3059 * `tgen`: tgen object
3060 * `input_dict` : data for vxlan config
3061
3062 Usage:
3063 ------
3064 input_dict= {
3065 "dcg2":{
3066 "vxlan":[{
3067 "vxlan_name": "vxlan75100",
3068 "vxlan_id": "75100",
3069 "dstport": 4789,
3070 "local_addr": "120.0.0.1",
3071 "learning": "no",
3072 "delete": True
3073 }]
3074 }
3075 }
3076
3077 configure_vxlan(tgen, input_dict)
3078
3079 Returns:
3080 -------
3081 True or errormsg
3082
3083 """
3084
3085 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3086
3087 router_list = tgen.routers()
3088 for dut in input_dict.keys():
3089 rnode = router_list[dut]
3090
3091 if "vxlan" in input_dict[dut]:
3092 for vxlan_dict in input_dict[dut]["vxlan"]:
3093 cmd = "ip link "
3094
3095 del_vxlan = vxlan_dict.setdefault("delete", None)
3096 vxlan_names = vxlan_dict.setdefault("vxlan_name", [])
3097 vxlan_ids = vxlan_dict.setdefault("vxlan_id", [])
3098 dstport = vxlan_dict.setdefault("dstport", None)
3099 local_addr = vxlan_dict.setdefault("local_addr", None)
3100 learning = vxlan_dict.setdefault("learning", None)
3101
3102 config_data = []
3103 if vxlan_names and vxlan_ids:
3104 for vxlan_name, vxlan_id in zip(vxlan_names, vxlan_ids):
3105 cmd = "ip link"
3106
3107 if del_vxlan:
3108 cmd = "{} del {} type vxlan id {}".format(
3109 cmd, vxlan_name, vxlan_id
3110 )
3111 else:
3112 cmd = "{} add {} type vxlan id {}".format(
3113 cmd, vxlan_name, vxlan_id
3114 )
3115
3116 if dstport:
3117 cmd = "{} dstport {}".format(cmd, dstport)
3118
3119 if local_addr:
3120 ip_cmd = "ip addr add {} dev {}".format(
3121 local_addr, vxlan_name
3122 )
3123 if del_vxlan:
3124 ip_cmd = "ip addr del {} dev {}".format(
3125 local_addr, vxlan_name
3126 )
3127
3128 config_data.append(ip_cmd)
3129
3130 cmd = "{} local {}".format(cmd, local_addr)
3131
3132 if learning == "no":
3133 cmd = "{} nolearning".format(cmd)
3134
3135 elif learning == "yes":
3136 cmd = "{} learning".format(cmd)
3137
3138 config_data.append(cmd)
3139
3140 try:
3141 for _cmd in config_data:
3142 logger.info("[DUT: %s]: Running command: %s", dut, _cmd)
3143 rnode.run(_cmd)
3144
3145 except InvalidCLIError:
3146 # Traceback
3147 errormsg = traceback.format_exc()
3148 logger.error(errormsg)
3149 return errormsg
3150
3151 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3152
3153 return True
3154
3155
3156 def configure_brctl(tgen, topo, input_dict):
3157 """
3158 Add and configure brctl
3159
3160 * `tgen`: tgen object
3161 * `input_dict` : data for brctl config
3162
3163 Usage:
3164 ------
3165 input_dict= {
3166 dut:{
3167 "brctl": [{
3168 "brctl_name": "br100",
3169 "addvxlan": "vxlan75100",
3170 "vrf": "RED",
3171 "stp": "off"
3172 }]
3173 }
3174 }
3175
3176 configure_brctl(tgen, topo, input_dict)
3177
3178 Returns:
3179 -------
3180 True or errormsg
3181
3182 """
3183
3184 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3185
3186 router_list = tgen.routers()
3187 for dut in input_dict.keys():
3188 rnode = router_list[dut]
3189
3190 if "brctl" in input_dict[dut]:
3191 for brctl_dict in input_dict[dut]["brctl"]:
3192 brctl_names = brctl_dict.setdefault("brctl_name", [])
3193 addvxlans = brctl_dict.setdefault("addvxlan", [])
3194 stp_values = brctl_dict.setdefault("stp", [])
3195 vrfs = brctl_dict.setdefault("vrf", [])
3196
3197 ip_cmd = "ip link set"
3198 for brctl_name, vxlan, vrf, stp in zip(
3199 brctl_names, addvxlans, vrfs, stp_values
3200 ):
3201 ip_cmd_list = []
3202 cmd = "ip link add name {} type bridge stp_state {}".format(
3203 brctl_name, stp
3204 )
3205
3206 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3207 rnode.run(cmd)
3208
3209 ip_cmd_list.append("{} up dev {}".format(ip_cmd, brctl_name))
3210
3211 if vxlan:
3212 cmd = "{} dev {} master {}".format(ip_cmd, vxlan, brctl_name)
3213
3214 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3215 rnode.run(cmd)
3216
3217 ip_cmd_list.append("{} up dev {}".format(ip_cmd, vxlan))
3218
3219 if vrf:
3220 ip_cmd_list.append(
3221 "{} dev {} master {}".format(ip_cmd, brctl_name, vrf)
3222 )
3223
3224 for intf_name, data in topo["routers"][dut]["links"].items():
3225 if "vrf" not in data:
3226 continue
3227
3228 if data["vrf"] == vrf:
3229 ip_cmd_list.append(
3230 "{} up dev {}".format(ip_cmd, data["interface"])
3231 )
3232
3233 try:
3234 for _ip_cmd in ip_cmd_list:
3235 logger.info("[DUT: %s]: Running command: %s", dut, _ip_cmd)
3236 rnode.run(_ip_cmd)
3237
3238 except InvalidCLIError:
3239 # Traceback
3240 errormsg = traceback.format_exc()
3241 logger.error(errormsg)
3242 return errormsg
3243
3244 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3245 return True
3246
3247
3248 def configure_interface_mac(tgen, input_dict):
3249 """
3250 Add and configure brctl
3251
3252 * `tgen`: tgen object
3253 * `input_dict` : data for mac config
3254
3255 input_mac= {
3256 "edge1":{
3257 "br75100": "00:80:48:BA:d1:00,
3258 "br75200": "00:80:48:BA:d1:00
3259 }
3260 }
3261
3262 configure_interface_mac(tgen, input_mac)
3263
3264 Returns:
3265 -------
3266 True or errormsg
3267
3268 """
3269
3270 router_list = tgen.routers()
3271 for dut in input_dict.keys():
3272 rnode = router_list[dut]
3273
3274 for intf, mac in input_dict[dut].items():
3275 cmd = "ip link set {} address {}".format(intf, mac)
3276 logger.info("[DUT: %s]: Running command: %s", dut, cmd)
3277
3278 try:
3279 result = rnode.run(cmd)
3280 if len(result) != 0:
3281 return result
3282
3283 except InvalidCLIError:
3284 # Traceback
3285 errormsg = traceback.format_exc()
3286 logger.error(errormsg)
3287 return errormsg
3288
3289 return True
3290
3291
3292 def socat_send_mld_join(
3293 tgen,
3294 server,
3295 protocol_option,
3296 mld_groups,
3297 send_from_intf,
3298 send_from_intf_ip=None,
3299 port=12345,
3300 reuseaddr=True,
3301 ):
3302 """
3303 API to send MLD join using SOCAT tool
3304
3305 Parameters:
3306 -----------
3307 * `tgen` : Topogen object
3308 * `server`: iperf server, from where IGMP join would be sent
3309 * `protocol_option`: Protocol options, ex: UDP6-RECV
3310 * `mld_groups`: IGMP group for which join has to be sent
3311 * `send_from_intf`: Interface from which join would be sent
3312 * `send_from_intf_ip`: Interface IP, default is None
3313 * `port`: Port to be used, default is 12345
3314 * `reuseaddr`: True|False, bydefault True
3315
3316 returns:
3317 --------
3318 errormsg or True
3319 """
3320
3321 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3322
3323 rnode = tgen.routers()[server]
3324 socat_args = "socat -u "
3325
3326 # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
3327 if protocol_option:
3328 socat_args += "{}".format(protocol_option)
3329
3330 if port:
3331 socat_args += ":{},".format(port)
3332
3333 if reuseaddr:
3334 socat_args += "{},".format("reuseaddr")
3335
3336 # Group address range to cover
3337 if mld_groups:
3338 if not isinstance(mld_groups, list):
3339 mld_groups = [mld_groups]
3340
3341 for mld_group in mld_groups:
3342 socat_cmd = socat_args
3343 join_option = "ipv6-join-group"
3344
3345 if send_from_intf and not send_from_intf_ip:
3346 socat_cmd += "{}='[{}]:{}'".format(join_option, mld_group, send_from_intf)
3347 else:
3348 socat_cmd += "{}='[{}]:{}:[{}]'".format(
3349 join_option, mld_group, send_from_intf, send_from_intf_ip
3350 )
3351
3352 socat_cmd += " STDOUT"
3353
3354 socat_cmd += " &>{}/socat.logs &".format(tgen.logdir)
3355
3356 # Run socat command to send IGMP join
3357 logger.info("[DUT: {}]: Running command: [{}]".format(server, socat_cmd))
3358 output = rnode.run("set +m; {} echo $!".format(socat_cmd))
3359
3360 # Check if socat join process is running
3361 if output:
3362 pid = output.split()[0]
3363 rnode.run("touch /var/run/frr/socat_join.pid")
3364 rnode.run("echo %s >> /var/run/frr/socat_join.pid" % pid)
3365 else:
3366 errormsg = "Socat join is not sent for {}. Error {}".format(
3367 mld_group, output
3368 )
3369 logger.error(output)
3370 return errormsg
3371
3372 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3373 return True
3374
3375
3376 def socat_send_pim6_traffic(
3377 tgen,
3378 server,
3379 protocol_option,
3380 mld_groups,
3381 send_from_intf,
3382 port=12345,
3383 multicast_hops=True,
3384 ):
3385 """
3386 API to send pim6 data taffic using SOCAT tool
3387
3388 Parameters:
3389 -----------
3390 * `tgen` : Topogen object
3391 * `server`: iperf server, from where IGMP join would be sent
3392 * `protocol_option`: Protocol options, ex: UDP6-RECV
3393 * `mld_groups`: MLD group for which join has to be sent
3394 * `send_from_intf`: Interface from which join would be sent
3395 * `port`: Port to be used, default is 12345
3396 * `multicast_hops`: multicast-hops count, default is 255
3397
3398 returns:
3399 --------
3400 errormsg or True
3401 """
3402
3403 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3404
3405 rnode = tgen.routers()[server]
3406 socat_args = "socat -u STDIO "
3407
3408 # UDP4/TCP4/UDP6/UDP6-RECV/UDP6-SEND
3409 if protocol_option:
3410 socat_args += "'{}".format(protocol_option)
3411
3412 # Group address range to cover
3413 if mld_groups:
3414 if not isinstance(mld_groups, list):
3415 mld_groups = [mld_groups]
3416
3417 for mld_group in mld_groups:
3418 socat_cmd = socat_args
3419 if port:
3420 socat_cmd += ":[{}]:{},".format(mld_group, port)
3421
3422 if send_from_intf:
3423 socat_cmd += "interface={0},so-bindtodevice={0},".format(send_from_intf)
3424
3425 if multicast_hops:
3426 socat_cmd += "multicast-hops=255'"
3427
3428 socat_cmd += " >{}/socat.logs &".format(tgen.logdir)
3429
3430 # Run socat command to send pim6 traffic
3431 logger.info(
3432 "[DUT: {}]: Running command: [set +m; ( while sleep 1; do date; done ) | {}]".format(
3433 server, socat_cmd
3434 )
3435 )
3436
3437 # Open a shell script file and write data to it, which will be
3438 # used to send pim6 traffic continously
3439 traffic_shell_script = "{}/{}/traffic.sh".format(tgen.logdir, server)
3440 with open("{}".format(traffic_shell_script), "w") as taffic_sh:
3441 taffic_sh.write(
3442 "#!/usr/bin/env bash\n( while sleep 1; do date; done ) | {}\n".format(
3443 socat_cmd
3444 )
3445 )
3446
3447 rnode.run("chmod 755 {}".format(traffic_shell_script))
3448 output = rnode.run("{} &>/dev/null & echo $!".format(traffic_shell_script))
3449
3450 # Check if socat traffic process is running
3451 if output:
3452 pid = output.split()[0]
3453 rnode.run("touch /var/run/frr/socat_traffic.pid")
3454 rnode.run("echo %s >> /var/run/frr/socat_traffic.pid" % pid)
3455
3456 else:
3457 errormsg = "Socat traffic is not sent for {}. Error {}".format(
3458 mld_group, output
3459 )
3460 logger.error(output)
3461 return errormsg
3462
3463 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3464 return True
3465
3466
3467 def kill_socat(tgen, dut=None, action=None):
3468 """
3469 Killing socat process if running for any router in topology
3470
3471 Parameters:
3472 -----------
3473 * `tgen` : Topogen object
3474 * `dut` : Any iperf hostname to send igmp prune
3475 * `action`: to kill mld join using socat
3476 to kill mld traffic using socat
3477
3478 Usage:
3479 ------
3480 kill_socat(tgen, dut ="i6", action="remove_mld_join")
3481
3482 """
3483
3484 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3485
3486 router_list = tgen.routers()
3487 for router, rnode in router_list.items():
3488 if dut is not None and router != dut:
3489 continue
3490
3491 traffic_shell_script = "{}/{}/traffic.sh".format(tgen.logdir, router)
3492 pid_socat_join = rnode.run("cat /var/run/frr/socat_join.pid")
3493 pid_socat_traffic = rnode.run("cat /var/run/frr/socat_traffic.pid")
3494 if action == "remove_mld_join":
3495 pids = pid_socat_join
3496 elif action == "remove_mld_traffic":
3497 pids = pid_socat_traffic
3498 else:
3499 pids = "\n".join([pid_socat_join, pid_socat_traffic])
3500
3501 if os.path.exists(traffic_shell_script):
3502 cmd = (
3503 "ps -ef | grep %s | awk -F' ' '{print $2}' | xargs kill -9"
3504 % traffic_shell_script
3505 )
3506 logger.debug("[DUT: {}]: Running command: [{}]".format(router, cmd))
3507 rnode.run(cmd)
3508
3509 for pid in pids.split("\n"):
3510 pid = pid.strip()
3511 if pid.isdigit():
3512 cmd = "set +m; kill -9 %s &> /dev/null" % pid
3513 logger.debug("[DUT: {}]: Running command: [{}]".format(router, cmd))
3514 rnode.run(cmd)
3515
3516 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3517
3518
3519 #############################################
3520 # Verification APIs
3521 #############################################
3522 @retry(retry_timeout=40)
3523 def verify_rib(
3524 tgen,
3525 addr_type,
3526 dut,
3527 input_dict,
3528 next_hop=None,
3529 protocol=None,
3530 tag=None,
3531 metric=None,
3532 fib=None,
3533 count_only=False,
3534 admin_distance=None,
3535 ):
3536 """
3537 Data will be read from input_dict or input JSON file, API will generate
3538 same prefixes, which were redistributed by either create_static_routes() or
3539 advertise_networks_using_network_command() and do will verify next_hop and
3540 each prefix/routes is present in "show ip/ipv6 route {bgp/stataic} json"
3541 command o/p.
3542
3543 Parameters
3544 ----------
3545 * `tgen` : topogen object
3546 * `addr_type` : ip type, ipv4/ipv6
3547 * `dut`: Device Under Test, for which user wants to test the data
3548 * `input_dict` : input dict, has details of static routes
3549 * `next_hop`[optional]: next_hop which needs to be verified,
3550 default: static
3551 * `protocol`[optional]: protocol, default = None
3552 * `count_only`[optional]: count of nexthops only, not specific addresses,
3553 default = False
3554
3555 Usage
3556 -----
3557 # RIB can be verified for static routes OR network advertised using
3558 network command. Following are input_dicts to create static routes
3559 and advertise networks using network command. Any one of the input_dict
3560 can be passed to verify_rib() to verify routes in DUT"s RIB.
3561
3562 # Creating static routes for r1
3563 input_dict = {
3564 "r1": {
3565 "static_routes": [{"network": "10.0.20.1/32", "no_of_ip": 9, \
3566 "admin_distance": 100, "next_hop": "10.0.0.2", "tag": 4001}]
3567 }}
3568 # Advertising networks using network command in router r1
3569 input_dict = {
3570 "r1": {
3571 "advertise_networks": [{"start_ip": "20.0.0.0/32",
3572 "no_of_network": 10},
3573 {"start_ip": "30.0.0.0/32"}]
3574 }}
3575 # Verifying ipv4 routes in router r1 learned via BGP
3576 dut = "r2"
3577 protocol = "bgp"
3578 result = verify_rib(tgen, "ipv4", dut, input_dict, protocol = protocol)
3579
3580 Returns
3581 -------
3582 errormsg(str) or True
3583 """
3584
3585 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
3586
3587 router_list = tgen.routers()
3588 additional_nexthops_in_required_nhs = []
3589 found_hops = []
3590 for routerInput in input_dict.keys():
3591 for router, rnode in router_list.items():
3592 if router != dut:
3593 continue
3594
3595 logger.info("Checking router %s RIB:", router)
3596
3597 # Verifying RIB routes
3598 if addr_type == "ipv4":
3599 command = "show ip route"
3600 else:
3601 command = "show ipv6 route"
3602
3603 found_routes = []
3604 missing_routes = []
3605
3606 if "static_routes" in input_dict[routerInput]:
3607 static_routes = input_dict[routerInput]["static_routes"]
3608
3609 for static_route in static_routes:
3610 if "vrf" in static_route and static_route["vrf"] is not None:
3611 logger.info(
3612 "[DUT: {}]: Verifying routes for VRF:"
3613 " {}".format(router, static_route["vrf"])
3614 )
3615
3616 cmd = "{} vrf {}".format(command, static_route["vrf"])
3617
3618 else:
3619 cmd = "{}".format(command)
3620
3621 if protocol:
3622 cmd = "{} {}".format(cmd, protocol)
3623
3624 cmd = "{} json".format(cmd)
3625
3626 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3627
3628 # Verifying output dictionary rib_routes_json is not empty
3629 if bool(rib_routes_json) is False:
3630 errormsg = "No route found in rib of router {}..".format(router)
3631 return errormsg
3632
3633 network = static_route["network"]
3634 if "no_of_ip" in static_route:
3635 no_of_ip = static_route["no_of_ip"]
3636 else:
3637 no_of_ip = 1
3638
3639 if "tag" in static_route:
3640 _tag = static_route["tag"]
3641 else:
3642 _tag = None
3643
3644 # Generating IPs for verification
3645 ip_list = generate_ips(network, no_of_ip)
3646 st_found = False
3647 nh_found = False
3648
3649 for st_rt in ip_list:
3650 st_rt = str(
3651 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
3652 )
3653 _addr_type = validate_ip_address(st_rt)
3654 if _addr_type != addr_type:
3655 continue
3656
3657 if st_rt in rib_routes_json:
3658 st_found = True
3659 found_routes.append(st_rt)
3660
3661 if "queued" in rib_routes_json[st_rt][0]:
3662 errormsg = "Route {} is queued\n".format(st_rt)
3663 return errormsg
3664
3665 if fib and next_hop:
3666 if type(next_hop) is not list:
3667 next_hop = [next_hop]
3668
3669 for mnh in range(0, len(rib_routes_json[st_rt])):
3670 if not "selected" in rib_routes_json[st_rt][mnh]:
3671 continue
3672
3673 if (
3674 "fib"
3675 in rib_routes_json[st_rt][mnh]["nexthops"][0]
3676 ):
3677 found_hops.append(
3678 [
3679 rib_r["ip"]
3680 for rib_r in rib_routes_json[st_rt][
3681 mnh
3682 ]["nexthops"]
3683 ]
3684 )
3685
3686 if found_hops[0]:
3687 missing_list_of_nexthops = set(
3688 found_hops[0]
3689 ).difference(next_hop)
3690 additional_nexthops_in_required_nhs = set(
3691 next_hop
3692 ).difference(found_hops[0])
3693
3694 if additional_nexthops_in_required_nhs:
3695 logger.info(
3696 "Nexthop "
3697 "%s is not active for route %s in "
3698 "RIB of router %s\n",
3699 additional_nexthops_in_required_nhs,
3700 st_rt,
3701 dut,
3702 )
3703 errormsg = (
3704 "Nexthop {} is not active"
3705 " for route {} in RIB of router"
3706 " {}\n".format(
3707 additional_nexthops_in_required_nhs,
3708 st_rt,
3709 dut,
3710 )
3711 )
3712 return errormsg
3713 else:
3714 nh_found = True
3715
3716 elif next_hop and fib is None:
3717 if type(next_hop) is not list:
3718 next_hop = [next_hop]
3719 found_hops = [
3720 rib_r["ip"]
3721 for rib_r in rib_routes_json[st_rt][0]["nexthops"]
3722 if "ip" in rib_r
3723 ]
3724
3725 # If somehow key "ip" is not found in nexthops JSON
3726 # then found_hops would be 0, this particular
3727 # situation will be handled here
3728 if not len(found_hops):
3729 errormsg = (
3730 "Nexthop {} is Missing for "
3731 "route {} in RIB of router {}\n".format(
3732 next_hop,
3733 st_rt,
3734 dut,
3735 )
3736 )
3737 return errormsg
3738
3739 # Check only the count of nexthops
3740 if count_only:
3741 if len(next_hop) == len(found_hops):
3742 nh_found = True
3743 else:
3744 errormsg = (
3745 "Nexthops are missing for "
3746 "route {} in RIB of router {}: "
3747 "expected {}, found {}\n".format(
3748 st_rt,
3749 dut,
3750 len(next_hop),
3751 len(found_hops),
3752 )
3753 )
3754 return errormsg
3755
3756 # Check the actual nexthops
3757 elif found_hops:
3758 missing_list_of_nexthops = set(
3759 found_hops
3760 ).difference(next_hop)
3761 additional_nexthops_in_required_nhs = set(
3762 next_hop
3763 ).difference(found_hops)
3764
3765 if additional_nexthops_in_required_nhs:
3766 logger.info(
3767 "Missing nexthop %s for route"
3768 " %s in RIB of router %s\n",
3769 additional_nexthops_in_required_nhs,
3770 st_rt,
3771 dut,
3772 )
3773 errormsg = (
3774 "Nexthop {} is Missing for "
3775 "route {} in RIB of router {}\n".format(
3776 additional_nexthops_in_required_nhs,
3777 st_rt,
3778 dut,
3779 )
3780 )
3781 return errormsg
3782 else:
3783 nh_found = True
3784
3785 if tag:
3786 if "tag" not in rib_routes_json[st_rt][0]:
3787 errormsg = (
3788 "[DUT: {}]: tag is not"
3789 " present for"
3790 " route {} in RIB \n".format(dut, st_rt)
3791 )
3792 return errormsg
3793
3794 if _tag != rib_routes_json[st_rt][0]["tag"]:
3795 errormsg = (
3796 "[DUT: {}]: tag value {}"
3797 " is not matched for"
3798 " route {} in RIB \n".format(
3799 dut,
3800 _tag,
3801 st_rt,
3802 )
3803 )
3804 return errormsg
3805
3806 if admin_distance is not None:
3807 if "distance" not in rib_routes_json[st_rt][0]:
3808 errormsg = (
3809 "[DUT: {}]: admin distance is"
3810 " not present for"
3811 " route {} in RIB \n".format(dut, st_rt)
3812 )
3813 return errormsg
3814
3815 if (
3816 admin_distance
3817 != rib_routes_json[st_rt][0]["distance"]
3818 ):
3819 errormsg = (
3820 "[DUT: {}]: admin distance value "
3821 "{} is not matched for "
3822 "route {} in RIB \n".format(
3823 dut,
3824 admin_distance,
3825 st_rt,
3826 )
3827 )
3828 return errormsg
3829
3830 if metric is not None:
3831 if "metric" not in rib_routes_json[st_rt][0]:
3832 errormsg = (
3833 "[DUT: {}]: metric is"
3834 " not present for"
3835 " route {} in RIB \n".format(dut, st_rt)
3836 )
3837 return errormsg
3838
3839 if metric != rib_routes_json[st_rt][0]["metric"]:
3840 errormsg = (
3841 "[DUT: {}]: metric value "
3842 "{} is not matched for "
3843 "route {} in RIB \n".format(
3844 dut,
3845 metric,
3846 st_rt,
3847 )
3848 )
3849 return errormsg
3850
3851 else:
3852 missing_routes.append(st_rt)
3853
3854 if nh_found:
3855 logger.info(
3856 "[DUT: {}]: Found next_hop {} for"
3857 " RIB routes: {}".format(router, next_hop, found_routes)
3858 )
3859
3860 if len(missing_routes) > 0:
3861 errormsg = "[DUT: {}]: Missing route in RIB, " "routes: {}".format(
3862 dut, missing_routes
3863 )
3864 return errormsg
3865
3866 if found_routes:
3867 logger.info(
3868 "[DUT: %s]: Verified routes in RIB, found" " routes are: %s\n",
3869 dut,
3870 found_routes,
3871 )
3872
3873 continue
3874
3875 if "bgp" in input_dict[routerInput]:
3876 if (
3877 "advertise_networks"
3878 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
3879 "unicast"
3880 ]
3881 ):
3882 continue
3883
3884 found_routes = []
3885 missing_routes = []
3886 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
3887 addr_type
3888 ]["unicast"]["advertise_networks"]
3889
3890 # Continue if there are no network advertise
3891 if len(advertise_network) == 0:
3892 continue
3893
3894 for advertise_network_dict in advertise_network:
3895 if "vrf" in advertise_network_dict:
3896 cmd = "{} vrf {} json".format(
3897 command, advertise_network_dict["vrf"]
3898 )
3899 else:
3900 cmd = "{} json".format(command)
3901
3902 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
3903
3904 # Verifying output dictionary rib_routes_json is not empty
3905 if bool(rib_routes_json) is False:
3906 errormsg = "No route found in rib of router {}..".format(router)
3907 return errormsg
3908
3909 start_ip = advertise_network_dict["network"]
3910 if "no_of_network" in advertise_network_dict:
3911 no_of_network = advertise_network_dict["no_of_network"]
3912 else:
3913 no_of_network = 1
3914
3915 # Generating IPs for verification
3916 ip_list = generate_ips(start_ip, no_of_network)
3917 st_found = False
3918 nh_found = False
3919
3920 for st_rt in ip_list:
3921 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
3922
3923 _addr_type = validate_ip_address(st_rt)
3924 if _addr_type != addr_type:
3925 continue
3926
3927 if st_rt in rib_routes_json:
3928 st_found = True
3929 found_routes.append(st_rt)
3930
3931 if "queued" in rib_routes_json[st_rt][0]:
3932 errormsg = "Route {} is queued\n".format(st_rt)
3933 return errormsg
3934
3935 if next_hop:
3936 if type(next_hop) is not list:
3937 next_hop = [next_hop]
3938
3939 count = 0
3940 for nh in next_hop:
3941 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
3942 if nh_dict["ip"] != nh:
3943 continue
3944 else:
3945 count += 1
3946
3947 if count == len(next_hop):
3948 nh_found = True
3949 else:
3950 errormsg = (
3951 "Nexthop {} is Missing"
3952 " for route {} in "
3953 "RIB of router {}\n".format(next_hop, st_rt, dut)
3954 )
3955 return errormsg
3956 else:
3957 missing_routes.append(st_rt)
3958
3959 if nh_found:
3960 logger.info(
3961 "Found next_hop {} for all routes in RIB"
3962 " of router {}\n".format(next_hop, dut)
3963 )
3964
3965 if len(missing_routes) > 0:
3966 errormsg = (
3967 "Missing {} route in RIB of router {}, "
3968 "routes: {} \n".format(addr_type, dut, missing_routes)
3969 )
3970 return errormsg
3971
3972 if found_routes:
3973 logger.info(
3974 "Verified {} routes in router {} RIB, found"
3975 " routes are: {}\n".format(addr_type, dut, found_routes)
3976 )
3977
3978 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
3979 return True
3980
3981
3982 @retry(retry_timeout=12)
3983 def verify_fib_routes(tgen, addr_type, dut, input_dict, next_hop=None, protocol=None):
3984 """
3985 Data will be read from input_dict or input JSON file, API will generate
3986 same prefixes, which were redistributed by either create_static_routes() or
3987 advertise_networks_using_network_command() and will verify next_hop and
3988 each prefix/routes is present in "show ip/ipv6 fib json"
3989 command o/p.
3990
3991 Parameters
3992 ----------
3993 * `tgen` : topogen object
3994 * `addr_type` : ip type, ipv4/ipv6
3995 * `dut`: Device Under Test, for which user wants to test the data
3996 * `input_dict` : input dict, has details of static routes
3997 * `next_hop`[optional]: next_hop which needs to be verified,
3998 default: static
3999
4000 Usage
4001 -----
4002 input_routes_r1 = {
4003 "r1": {
4004 "static_routes": [{
4005 "network": ["1.1.1.1/32],
4006 "next_hop": "Null0",
4007 "vrf": "RED"
4008 }]
4009 }
4010 }
4011 result = result = verify_fib_routes(tgen, "ipv4, "r1", input_routes_r1)
4012
4013 Returns
4014 -------
4015 errormsg(str) or True
4016 """
4017
4018 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4019
4020 router_list = tgen.routers()
4021 if dut not in router_list:
4022 return
4023
4024 for routerInput in input_dict.keys():
4025 # XXX replace with router = dut; rnode = router_list[dut]
4026 for router, rnode in router_list.items():
4027 if router != dut:
4028 continue
4029
4030 logger.info("Checking router %s FIB routes:", router)
4031
4032 # Verifying RIB routes
4033 if addr_type == "ipv4":
4034 command = "show ip fib"
4035 else:
4036 command = "show ipv6 fib"
4037
4038 found_routes = []
4039 missing_routes = []
4040
4041 if protocol:
4042 command = "{} {}".format(command, protocol)
4043
4044 if "static_routes" in input_dict[routerInput]:
4045 static_routes = input_dict[routerInput]["static_routes"]
4046
4047 for static_route in static_routes:
4048 if "vrf" in static_route and static_route["vrf"] is not None:
4049 logger.info(
4050 "[DUT: {}]: Verifying routes for VRF:"
4051 " {}".format(router, static_route["vrf"])
4052 )
4053
4054 cmd = "{} vrf {}".format(command, static_route["vrf"])
4055
4056 else:
4057 cmd = "{}".format(command)
4058
4059 cmd = "{} json".format(cmd)
4060
4061 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
4062
4063 # Verifying output dictionary rib_routes_json is not empty
4064 if bool(rib_routes_json) is False:
4065 errormsg = "[DUT: {}]: No route found in fib".format(router)
4066 return errormsg
4067
4068 network = static_route["network"]
4069 if "no_of_ip" in static_route:
4070 no_of_ip = static_route["no_of_ip"]
4071 else:
4072 no_of_ip = 1
4073
4074 # Generating IPs for verification
4075 ip_list = generate_ips(network, no_of_ip)
4076 st_found = False
4077 nh_found = False
4078
4079 for st_rt in ip_list:
4080 st_rt = str(
4081 ipaddress.ip_network(frr_unicode(st_rt), strict=False)
4082 )
4083 _addr_type = validate_ip_address(st_rt)
4084 if _addr_type != addr_type:
4085 continue
4086
4087 if st_rt in rib_routes_json:
4088 st_found = True
4089 found_routes.append(st_rt)
4090
4091 if next_hop:
4092 if type(next_hop) is not list:
4093 next_hop = [next_hop]
4094
4095 count = 0
4096 for nh in next_hop:
4097 for nh_dict in rib_routes_json[st_rt][0][
4098 "nexthops"
4099 ]:
4100 if nh_dict["ip"] != nh:
4101 continue
4102 else:
4103 count += 1
4104
4105 if count == len(next_hop):
4106 nh_found = True
4107 else:
4108 missing_routes.append(st_rt)
4109 errormsg = (
4110 "Nexthop {} is Missing"
4111 " for route {} in "
4112 "RIB of router {}\n".format(
4113 next_hop, st_rt, dut
4114 )
4115 )
4116 return errormsg
4117
4118 else:
4119 missing_routes.append(st_rt)
4120
4121 if len(missing_routes) > 0:
4122 errormsg = "[DUT: {}]: Missing route in FIB:" " {}".format(
4123 dut, missing_routes
4124 )
4125 return errormsg
4126
4127 if nh_found:
4128 logger.info(
4129 "Found next_hop {} for all routes in RIB"
4130 " of router {}\n".format(next_hop, dut)
4131 )
4132
4133 if found_routes:
4134 logger.info(
4135 "[DUT: %s]: Verified routes in FIB, found" " routes are: %s\n",
4136 dut,
4137 found_routes,
4138 )
4139
4140 continue
4141
4142 if "bgp" in input_dict[routerInput]:
4143 if (
4144 "advertise_networks"
4145 not in input_dict[routerInput]["bgp"]["address_family"][addr_type][
4146 "unicast"
4147 ]
4148 ):
4149 continue
4150
4151 found_routes = []
4152 missing_routes = []
4153 advertise_network = input_dict[routerInput]["bgp"]["address_family"][
4154 addr_type
4155 ]["unicast"]["advertise_networks"]
4156
4157 # Continue if there are no network advertise
4158 if len(advertise_network) == 0:
4159 continue
4160
4161 for advertise_network_dict in advertise_network:
4162 if "vrf" in advertise_network_dict:
4163 cmd = "{} vrf {} json".format(command, static_route["vrf"])
4164 else:
4165 cmd = "{} json".format(command)
4166
4167 rib_routes_json = run_frr_cmd(rnode, cmd, isjson=True)
4168
4169 # Verifying output dictionary rib_routes_json is not empty
4170 if bool(rib_routes_json) is False:
4171 errormsg = "No route found in rib of router {}..".format(router)
4172 return errormsg
4173
4174 start_ip = advertise_network_dict["network"]
4175 if "no_of_network" in advertise_network_dict:
4176 no_of_network = advertise_network_dict["no_of_network"]
4177 else:
4178 no_of_network = 1
4179
4180 # Generating IPs for verification
4181 ip_list = generate_ips(start_ip, no_of_network)
4182 st_found = False
4183 nh_found = False
4184
4185 for st_rt in ip_list:
4186 st_rt = str(ipaddress.ip_network(frr_unicode(st_rt), strict=False))
4187
4188 _addr_type = validate_ip_address(st_rt)
4189 if _addr_type != addr_type:
4190 continue
4191
4192 if st_rt in rib_routes_json:
4193 st_found = True
4194 found_routes.append(st_rt)
4195
4196 if next_hop:
4197 if type(next_hop) is not list:
4198 next_hop = [next_hop]
4199
4200 count = 0
4201 for nh in next_hop:
4202 for nh_dict in rib_routes_json[st_rt][0]["nexthops"]:
4203 if nh_dict["ip"] != nh:
4204 continue
4205 else:
4206 count += 1
4207
4208 if count == len(next_hop):
4209 nh_found = True
4210 else:
4211 missing_routes.append(st_rt)
4212 errormsg = (
4213 "Nexthop {} is Missing"
4214 " for route {} in "
4215 "RIB of router {}\n".format(next_hop, st_rt, dut)
4216 )
4217 return errormsg
4218 else:
4219 missing_routes.append(st_rt)
4220
4221 if len(missing_routes) > 0:
4222 errormsg = "[DUT: {}]: Missing route in FIB: " "{} \n".format(
4223 dut, missing_routes
4224 )
4225 return errormsg
4226
4227 if nh_found:
4228 logger.info(
4229 "Found next_hop {} for all routes in RIB"
4230 " of router {}\n".format(next_hop, dut)
4231 )
4232
4233 if found_routes:
4234 logger.info(
4235 "[DUT: {}]: Verified routes FIB"
4236 ", found routes are: {}\n".format(dut, found_routes)
4237 )
4238
4239 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4240 return True
4241
4242
4243 def verify_admin_distance_for_static_routes(tgen, input_dict):
4244 """
4245 API to verify admin distance for static routes as defined in input_dict/
4246 input JSON by running show ip/ipv6 route json command.
4247 Parameter
4248 ---------
4249 * `tgen` : topogen object
4250 * `input_dict`: having details like - for which router and static routes
4251 admin dsitance needs to be verified
4252 Usage
4253 -----
4254 # To verify admin distance is 10 for prefix 10.0.20.1/32 having next_hop
4255 10.0.0.2 in router r1
4256 input_dict = {
4257 "r1": {
4258 "static_routes": [{
4259 "network": "10.0.20.1/32",
4260 "admin_distance": 10,
4261 "next_hop": "10.0.0.2"
4262 }]
4263 }
4264 }
4265 result = verify_admin_distance_for_static_routes(tgen, input_dict)
4266 Returns
4267 -------
4268 errormsg(str) or True
4269 """
4270
4271 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4272
4273 router_list = tgen.routers()
4274 for router in input_dict.keys():
4275 if router not in router_list:
4276 continue
4277 rnode = router_list[router]
4278
4279 for static_route in input_dict[router]["static_routes"]:
4280 addr_type = validate_ip_address(static_route["network"])
4281 # Command to execute
4282 if addr_type == "ipv4":
4283 command = "show ip route json"
4284 else:
4285 command = "show ipv6 route json"
4286 show_ip_route_json = run_frr_cmd(rnode, command, isjson=True)
4287
4288 logger.info(
4289 "Verifying admin distance for static route %s" " under dut %s:",
4290 static_route,
4291 router,
4292 )
4293 network = static_route["network"]
4294 next_hop = static_route["next_hop"]
4295 admin_distance = static_route["admin_distance"]
4296 route_data = show_ip_route_json[network][0]
4297 if network in show_ip_route_json:
4298 if route_data["nexthops"][0]["ip"] == next_hop:
4299 if route_data["distance"] != admin_distance:
4300 errormsg = (
4301 "Verification failed: admin distance"
4302 " for static route {} under dut {},"
4303 " found:{} but expected:{}".format(
4304 static_route,
4305 router,
4306 route_data["distance"],
4307 admin_distance,
4308 )
4309 )
4310 return errormsg
4311 else:
4312 logger.info(
4313 "Verification successful: admin"
4314 " distance for static route %s under"
4315 " dut %s, found:%s",
4316 static_route,
4317 router,
4318 route_data["distance"],
4319 )
4320
4321 else:
4322 errormsg = (
4323 "Static route {} not found in "
4324 "show_ip_route_json for dut {}".format(network, router)
4325 )
4326 return errormsg
4327
4328 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4329 return True
4330
4331
4332 def verify_prefix_lists(tgen, input_dict):
4333 """
4334 Running "show ip prefix-list" command and verifying given prefix-list
4335 is present in router.
4336 Parameters
4337 ----------
4338 * `tgen` : topogen object
4339 * `input_dict`: data to verify prefix lists
4340 Usage
4341 -----
4342 # To verify pf_list_1 is present in router r1
4343 input_dict = {
4344 "r1": {
4345 "prefix_lists": ["pf_list_1"]
4346 }}
4347 result = verify_prefix_lists("ipv4", input_dict, tgen)
4348 Returns
4349 -------
4350 errormsg(str) or True
4351 """
4352
4353 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4354
4355 router_list = tgen.routers()
4356 for router in input_dict.keys():
4357 if router not in router_list:
4358 continue
4359
4360 rnode = router_list[router]
4361
4362 # Show ip prefix list
4363 show_prefix_list = run_frr_cmd(rnode, "show ip prefix-list")
4364
4365 # Verify Prefix list is deleted
4366 prefix_lists_addr = input_dict[router]["prefix_lists"]
4367 for addr_type in prefix_lists_addr:
4368 if not check_address_types(addr_type):
4369 continue
4370 # show ip prefix list
4371 if addr_type == "ipv4":
4372 cmd = "show ip prefix-list"
4373 else:
4374 cmd = "show {} prefix-list".format(addr_type)
4375 show_prefix_list = run_frr_cmd(rnode, cmd)
4376 for prefix_list in prefix_lists_addr[addr_type].keys():
4377 if prefix_list in show_prefix_list:
4378 errormsg = (
4379 "Prefix list {} is/are present in the router"
4380 " {}".format(prefix_list, router)
4381 )
4382 return errormsg
4383
4384 logger.info(
4385 "Prefix list %s is/are not present in the router" " from router %s",
4386 prefix_list,
4387 router,
4388 )
4389
4390 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4391 return True
4392
4393
4394 @retry(retry_timeout=12)
4395 def verify_route_maps(tgen, input_dict):
4396 """
4397 Running "show route-map" command and verifying given route-map
4398 is present in router.
4399 Parameters
4400 ----------
4401 * `tgen` : topogen object
4402 * `input_dict`: data to verify prefix lists
4403 Usage
4404 -----
4405 # To verify rmap_1 and rmap_2 are present in router r1
4406 input_dict = {
4407 "r1": {
4408 "route_maps": ["rmap_1", "rmap_2"]
4409 }
4410 }
4411 result = verify_route_maps(tgen, input_dict)
4412 Returns
4413 -------
4414 errormsg(str) or True
4415 """
4416
4417 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4418
4419 router_list = tgen.routers()
4420 for router in input_dict.keys():
4421 if router not in router_list:
4422 continue
4423
4424 rnode = router_list[router]
4425 # Show ip route-map
4426 show_route_maps = rnode.vtysh_cmd("show route-map")
4427
4428 # Verify route-map is deleted
4429 route_maps = input_dict[router]["route_maps"]
4430 for route_map in route_maps:
4431 if route_map in show_route_maps:
4432 errormsg = "Route map {} is not deleted from router" " {}".format(
4433 route_map, router
4434 )
4435 return errormsg
4436
4437 logger.info(
4438 "Route map %s is/are deleted successfully from" " router %s",
4439 route_maps,
4440 router,
4441 )
4442
4443 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4444 return True
4445
4446
4447 @retry(retry_timeout=16)
4448 def verify_bgp_community(tgen, addr_type, router, network, input_dict=None):
4449 """
4450 API to veiryf BGP large community is attached in route for any given
4451 DUT by running "show bgp ipv4/6 {route address} json" command.
4452 Parameters
4453 ----------
4454 * `tgen`: topogen object
4455 * `addr_type` : ip type, ipv4/ipv6
4456 * `dut`: Device Under Test
4457 * `network`: network for which set criteria needs to be verified
4458 * `input_dict`: having details like - for which router, community and
4459 values needs to be verified
4460 Usage
4461 -----
4462 networks = ["200.50.2.0/32"]
4463 input_dict = {
4464 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
4465 }
4466 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
4467 Returns
4468 -------
4469 errormsg(str) or True
4470 """
4471
4472 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4473 router_list = tgen.routers()
4474 if router not in router_list:
4475 return False
4476
4477 rnode = router_list[router]
4478
4479 logger.debug(
4480 "Verifying BGP community attributes on dut %s: for %s " "network %s",
4481 router,
4482 addr_type,
4483 network,
4484 )
4485
4486 for net in network:
4487 cmd = "show bgp {} {} json".format(addr_type, net)
4488 show_bgp_json = rnode.vtysh_cmd(cmd, isjson=True)
4489 logger.info(show_bgp_json)
4490 if "paths" not in show_bgp_json:
4491 return "Prefix {} not found in BGP table of router: {}".format(net, router)
4492
4493 as_paths = show_bgp_json["paths"]
4494 found = False
4495 for i in range(len(as_paths)):
4496 if (
4497 "largeCommunity" in show_bgp_json["paths"][i]
4498 or "community" in show_bgp_json["paths"][i]
4499 ):
4500 found = True
4501 logger.info(
4502 "Large Community attribute is found for route:" " %s in router: %s",
4503 net,
4504 router,
4505 )
4506 if input_dict is not None:
4507 for criteria, comm_val in input_dict.items():
4508 show_val = show_bgp_json["paths"][i][criteria]["string"]
4509 if comm_val == show_val:
4510 logger.info(
4511 "Verifying BGP %s for prefix: %s"
4512 " in router: %s, found expected"
4513 " value: %s",
4514 criteria,
4515 net,
4516 router,
4517 comm_val,
4518 )
4519 else:
4520 errormsg = (
4521 "Failed: Verifying BGP attribute"
4522 " {} for route: {} in router: {}"
4523 ", expected value: {} but found"
4524 ": {}".format(criteria, net, router, comm_val, show_val)
4525 )
4526 return errormsg
4527
4528 if not found:
4529 errormsg = (
4530 "Large Community attribute is not found for route: "
4531 "{} in router: {} ".format(net, router)
4532 )
4533 return errormsg
4534
4535 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4536 return True
4537
4538
4539 def get_ipv6_linklocal_address(topo, node, intf):
4540 """
4541 API to get the link local ipv6 address of a particular interface
4542
4543 Parameters
4544 ----------
4545 * `node`: node on which link local ip to be fetched.
4546 * `intf` : interface for which link local ip needs to be returned.
4547 * `topo` : base topo
4548
4549 Usage
4550 -----
4551 result = get_ipv6_linklocal_address(topo, 'r1', 'r2')
4552
4553 Returns link local ip of interface between r1 and r2.
4554
4555 Returns
4556 -------
4557 1) link local ipv6 address from the interface
4558 2) errormsg - when link local ip not found
4559 """
4560 tgen = get_topogen()
4561 ext_nh = tgen.net[node].get_ipv6_linklocal()
4562 req_nh = topo[node]["links"][intf]["interface"]
4563 llip = None
4564 for llips in ext_nh:
4565 if llips[0] == req_nh:
4566 llip = llips[1]
4567 logger.info("Link local ip found = %s", llip)
4568 return llip
4569
4570 errormsg = "Failed: Link local ip not found on router {}, " "interface {}".format(
4571 node, intf
4572 )
4573
4574 return errormsg
4575
4576
4577 def verify_create_community_list(tgen, input_dict):
4578 """
4579 API is to verify if large community list is created for any given DUT in
4580 input_dict by running "sh bgp large-community-list {"comm_name"} detail"
4581 command.
4582 Parameters
4583 ----------
4584 * `tgen`: topogen object
4585 * `input_dict`: having details like - for which router, large community
4586 needs to be verified
4587 Usage
4588 -----
4589 input_dict = {
4590 "r1": {
4591 "large-community-list": {
4592 "standard": {
4593 "Test1": [{"action": "PERMIT", "attribute":\
4594 ""}]
4595 }}}}
4596 result = verify_create_community_list(tgen, input_dict)
4597 Returns
4598 -------
4599 errormsg(str) or True
4600 """
4601
4602 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4603
4604 router_list = tgen.routers()
4605 for router in input_dict.keys():
4606 if router not in router_list:
4607 continue
4608
4609 rnode = router_list[router]
4610
4611 logger.info("Verifying large-community is created for dut %s:", router)
4612
4613 for comm_data in input_dict[router]["bgp_community_lists"]:
4614 comm_name = comm_data["name"]
4615 comm_type = comm_data["community_type"]
4616 show_bgp_community = run_frr_cmd(
4617 rnode, "show bgp large-community-list {} detail".format(comm_name)
4618 )
4619
4620 # Verify community list and type
4621 if comm_name in show_bgp_community and comm_type in show_bgp_community:
4622 logger.info(
4623 "BGP %s large-community-list %s is" " created", comm_type, comm_name
4624 )
4625 else:
4626 errormsg = "BGP {} large-community-list {} is not" " created".format(
4627 comm_type, comm_name
4628 )
4629 return errormsg
4630
4631 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4632 return True
4633
4634
4635 def verify_cli_json(tgen, input_dict):
4636 """
4637 API to verify if JSON is available for clis
4638 command.
4639 Parameters
4640 ----------
4641 * `tgen`: topogen object
4642 * `input_dict`: CLIs for which JSON needs to be verified
4643 Usage
4644 -----
4645 input_dict = {
4646 "edge1":{
4647 "cli": ["show evpn vni detail", show evpn rmac vni all]
4648 }
4649 }
4650
4651 result = verify_cli_json(tgen, input_dict)
4652
4653 Returns
4654 -------
4655 errormsg(str) or True
4656 """
4657
4658 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4659 for dut in input_dict.keys():
4660 rnode = tgen.gears[dut]
4661
4662 for cli in input_dict[dut]["cli"]:
4663 logger.info(
4664 "[DUT: %s]: Verifying JSON is available for " "CLI %s :", dut, cli
4665 )
4666
4667 test_cli = "{} json".format(cli)
4668 ret_json = rnode.vtysh_cmd(test_cli, isjson=True)
4669 if not bool(ret_json):
4670 errormsg = "CLI: %s, JSON format is not available" % (cli)
4671 return errormsg
4672 elif "unknown" in ret_json or "Unknown" in ret_json:
4673 errormsg = "CLI: %s, JSON format is not available" % (cli)
4674 return errormsg
4675 else:
4676 logger.info(
4677 "CLI : %s JSON format is available: " "\n %s", cli, ret_json
4678 )
4679
4680 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4681
4682 return True
4683
4684
4685 @retry(retry_timeout=12)
4686 def verify_evpn_vni(tgen, input_dict):
4687 """
4688 API to verify evpn vni details using "show evpn vni detail json"
4689 command.
4690
4691 Parameters
4692 ----------
4693 * `tgen`: topogen object
4694 * `input_dict`: having details like - for which router, evpn details
4695 needs to be verified
4696 Usage
4697 -----
4698 input_dict = {
4699 "edge1":{
4700 "vni": [
4701 {
4702 "75100":{
4703 "vrf": "RED",
4704 "vxlanIntf": "vxlan75100",
4705 "localVtepIp": "120.1.1.1",
4706 "sviIntf": "br100"
4707 }
4708 }
4709 ]
4710 }
4711 }
4712
4713 result = verify_evpn_vni(tgen, input_dict)
4714
4715 Returns
4716 -------
4717 errormsg(str) or True
4718 """
4719
4720 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4721 for dut in input_dict.keys():
4722 rnode = tgen.gears[dut]
4723
4724 logger.info("[DUT: %s]: Verifying evpn vni details :", dut)
4725
4726 cmd = "show evpn vni detail json"
4727 evpn_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4728 if not bool(evpn_all_vni_json):
4729 errormsg = "No output for '{}' cli".format(cmd)
4730 return errormsg
4731
4732 if "vni" in input_dict[dut]:
4733 for vni_dict in input_dict[dut]["vni"]:
4734 found = False
4735 vni = vni_dict["name"]
4736 for evpn_vni_json in evpn_all_vni_json:
4737 if "vni" in evpn_vni_json:
4738 if evpn_vni_json["vni"] != int(vni):
4739 continue
4740
4741 for attribute in vni_dict.keys():
4742 if vni_dict[attribute] != evpn_vni_json[attribute]:
4743 errormsg = (
4744 "[DUT: %s] Verifying "
4745 "%s for VNI: %s [FAILED]||"
4746 ", EXPECTED : %s "
4747 " FOUND : %s"
4748 % (
4749 dut,
4750 attribute,
4751 vni,
4752 vni_dict[attribute],
4753 evpn_vni_json[attribute],
4754 )
4755 )
4756 return errormsg
4757
4758 else:
4759 found = True
4760 logger.info(
4761 "[DUT: %s] Verifying"
4762 " %s for VNI: %s , "
4763 "Found Expected : %s ",
4764 dut,
4765 attribute,
4766 vni,
4767 evpn_vni_json[attribute],
4768 )
4769
4770 if evpn_vni_json["state"] != "Up":
4771 errormsg = (
4772 "[DUT: %s] Failed: Verifying"
4773 " State for VNI: %s is not Up" % (dut, vni)
4774 )
4775 return errormsg
4776
4777 else:
4778 errormsg = (
4779 "[DUT: %s] Failed:"
4780 " VNI: %s is not present in JSON" % (dut, vni)
4781 )
4782 return errormsg
4783
4784 if found:
4785 logger.info(
4786 "[DUT %s]: Verifying VNI : %s "
4787 "details and state is Up [PASSED]!!",
4788 dut,
4789 vni,
4790 )
4791 return True
4792
4793 else:
4794 errormsg = (
4795 "[DUT: %s] Failed:" " vni details are not present in input data" % (dut)
4796 )
4797 return errormsg
4798
4799 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4800 return False
4801
4802
4803 @retry(retry_timeout=12)
4804 def verify_vrf_vni(tgen, input_dict):
4805 """
4806 API to verify vrf vni details using "show vrf vni json"
4807 command.
4808 Parameters
4809 ----------
4810 * `tgen`: topogen object
4811 * `input_dict`: having details like - for which router, evpn details
4812 needs to be verified
4813 Usage
4814 -----
4815 input_dict = {
4816 "edge1":{
4817 "vrfs": [
4818 {
4819 "RED":{
4820 "vni": 75000,
4821 "vxlanIntf": "vxlan75100",
4822 "sviIntf": "br100",
4823 "routerMac": "00:80:48:ba:d1:00",
4824 "state": "Up"
4825 }
4826 }
4827 ]
4828 }
4829 }
4830
4831 result = verify_vrf_vni(tgen, input_dict)
4832
4833 Returns
4834 -------
4835 errormsg(str) or True
4836 """
4837
4838 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
4839 for dut in input_dict.keys():
4840 rnode = tgen.gears[dut]
4841
4842 logger.info("[DUT: %s]: Verifying vrf vni details :", dut)
4843
4844 cmd = "show vrf vni json"
4845 vrf_all_vni_json = run_frr_cmd(rnode, cmd, isjson=True)
4846 if not bool(vrf_all_vni_json):
4847 errormsg = "No output for '{}' cli".format(cmd)
4848 return errormsg
4849
4850 if "vrfs" in input_dict[dut]:
4851 for vrfs in input_dict[dut]["vrfs"]:
4852 for vrf, vrf_dict in vrfs.items():
4853 found = False
4854 for vrf_vni_json in vrf_all_vni_json["vrfs"]:
4855 if "vrf" in vrf_vni_json:
4856 if vrf_vni_json["vrf"] != vrf:
4857 continue
4858
4859 for attribute in vrf_dict.keys():
4860 if vrf_dict[attribute] == vrf_vni_json[attribute]:
4861 found = True
4862 logger.info(
4863 "[DUT %s]: VRF: %s, "
4864 "verifying %s "
4865 ", Found Expected: %s "
4866 "[PASSED]!!",
4867 dut,
4868 vrf,
4869 attribute,
4870 vrf_vni_json[attribute],
4871 )
4872 else:
4873 errormsg = (
4874 "[DUT: %s] VRF: %s, "
4875 "verifying %s [FAILED!!] "
4876 ", EXPECTED : %s "
4877 ", FOUND : %s"
4878 % (
4879 dut,
4880 vrf,
4881 attribute,
4882 vrf_dict[attribute],
4883 vrf_vni_json[attribute],
4884 )
4885 )
4886 return errormsg
4887
4888 else:
4889 errormsg = "[DUT: %s] VRF: %s " "is not present in JSON" % (
4890 dut,
4891 vrf,
4892 )
4893 return errormsg
4894
4895 if found:
4896 logger.info(
4897 "[DUT %s] Verifying VRF: %s " " details [PASSED]!!",
4898 dut,
4899 vrf,
4900 )
4901 return True
4902
4903 else:
4904 errormsg = (
4905 "[DUT: %s] Failed:" " vrf details are not present in input data" % (dut)
4906 )
4907 return errormsg
4908
4909 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
4910 return False
4911
4912
4913 def required_linux_kernel_version(required_version):
4914 """
4915 This API is used to check linux version compatibility of the test suite.
4916 If version mentioned in required_version is higher than the linux kernel
4917 of the system, test suite will be skipped. This API returns true or errormsg.
4918
4919 Parameters
4920 ----------
4921 * `required_version` : Kernel version required for the suites to run.
4922
4923 Usage
4924 -----
4925 result = linux_kernel_version_lowerthan('4.15')
4926
4927 Returns
4928 -------
4929 errormsg(str) or True
4930 """
4931 system_kernel = platform.release()
4932 if version_cmp(system_kernel, required_version) < 0:
4933 error_msg = (
4934 'These tests will not run on kernel "{}", '
4935 "they require kernel >= {})".format(system_kernel, required_version)
4936 )
4937
4938 logger.info(error_msg)
4939
4940 return error_msg
4941 return True
4942
4943
4944 class HostApplicationHelper(object):
4945 """Helper to track and cleanup per-host based test processes."""
4946
4947 def __init__(self, tgen=None, base_cmd=None):
4948 self.base_cmd_str = ""
4949 self.host_procs = {}
4950 self.tgen = None
4951 self.set_base_cmd(base_cmd if base_cmd else [])
4952 if tgen is not None:
4953 self.init(tgen)
4954
4955 def __enter__(self):
4956 self.init()
4957 return self
4958
4959 def __exit__(self, type, value, traceback):
4960 self.cleanup()
4961
4962 def __str__(self):
4963 return "HostApplicationHelper({})".format(self.base_cmd_str)
4964
4965 def set_base_cmd(self, base_cmd):
4966 assert isinstance(base_cmd, list) or isinstance(base_cmd, tuple)
4967 self.base_cmd = base_cmd
4968 if base_cmd:
4969 self.base_cmd_str = " ".join(base_cmd)
4970 else:
4971 self.base_cmd_str = ""
4972
4973 def init(self, tgen=None):
4974 """Initialize the helper with tgen if needed.
4975
4976 If overridden, need to handle multiple entries but one init. Will be called on
4977 object creation if tgen is supplied. Will be called again on __enter__ so should
4978 not re-init if already inited.
4979 """
4980 if self.tgen:
4981 assert tgen is None or self.tgen == tgen
4982 else:
4983 self.tgen = tgen
4984
4985 def started_proc(self, host, p):
4986 """Called after process started on host.
4987
4988 Return value is passed to `stopping_proc` method."""
4989 logger.debug("%s: Doing nothing after starting process", self)
4990 return False
4991
4992 def stopping_proc(self, host, p, info):
4993 """Called after process started on host."""
4994 logger.debug("%s: Doing nothing before stopping process", self)
4995
4996 def _add_host_proc(self, host, p):
4997 v = self.started_proc(host, p)
4998
4999 if host not in self.host_procs:
5000 self.host_procs[host] = []
5001 logger.debug("%s: %s: tracking process %s", self, host, p)
5002 self.host_procs[host].append((p, v))
5003
5004 def stop_host(self, host):
5005 """Stop the process on the host.
5006
5007 Override to do additional cleanup."""
5008 if host in self.host_procs:
5009 hlogger = self.tgen.net[host].logger
5010 for p, v in self.host_procs[host]:
5011 self.stopping_proc(host, p, v)
5012 logger.debug("%s: %s: terminating process %s", self, host, p.pid)
5013 hlogger.debug("%s: %s: terminating process %s", self, host, p.pid)
5014 rc = p.poll()
5015 if rc is not None:
5016 logger.error(
5017 "%s: %s: process early exit %s: %s",
5018 self,
5019 host,
5020 p.pid,
5021 comm_error(p),
5022 )
5023 hlogger.error(
5024 "%s: %s: process early exit %s: %s",
5025 self,
5026 host,
5027 p.pid,
5028 comm_error(p),
5029 )
5030 else:
5031 p.terminate()
5032 p.wait()
5033 logger.debug(
5034 "%s: %s: terminated process %s: %s",
5035 self,
5036 host,
5037 p.pid,
5038 comm_error(p),
5039 )
5040 hlogger.debug(
5041 "%s: %s: terminated process %s: %s",
5042 self,
5043 host,
5044 p.pid,
5045 comm_error(p),
5046 )
5047
5048 del self.host_procs[host]
5049
5050 def stop_all_hosts(self):
5051 hosts = set(self.host_procs)
5052 for host in hosts:
5053 self.stop_host(host)
5054
5055 def cleanup(self):
5056 self.stop_all_hosts()
5057
5058 def run(self, host, cmd_args, **kwargs):
5059 cmd = list(self.base_cmd)
5060 cmd.extend(cmd_args)
5061 p = self.tgen.gears[host].popen(cmd, **kwargs)
5062 assert p.poll() is None
5063 self._add_host_proc(host, p)
5064 return p
5065
5066 def check_procs(self):
5067 """Check that all current processes are running, log errors if not.
5068
5069 Returns: List of stopped processes."""
5070 procs = []
5071
5072 logger.debug("%s: checking procs on hosts %s", self, self.host_procs.keys())
5073
5074 for host in self.host_procs:
5075 hlogger = self.tgen.net[host].logger
5076 for p, _ in self.host_procs[host]:
5077 logger.debug("%s: checking %s proc %s", self, host, p)
5078 rc = p.poll()
5079 if rc is None:
5080 continue
5081 logger.error(
5082 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
5083 )
5084 hlogger.error(
5085 "%s: %s proc exited: %s", self, host, comm_error(p), exc_info=True
5086 )
5087 procs.append(p)
5088 return procs
5089
5090
5091 class IPerfHelper(HostApplicationHelper):
5092 def __str__(self):
5093 return "IPerfHelper()"
5094
5095 def run_join(
5096 self,
5097 host,
5098 join_addr,
5099 l4Type="UDP",
5100 join_interval=1,
5101 join_intf=None,
5102 join_towards=None,
5103 ):
5104 """
5105 Use iperf to send IGMP join and listen to traffic
5106
5107 Parameters:
5108 -----------
5109 * `host`: iperf host from where IGMP join would be sent
5110 * `l4Type`: string, one of [ TCP, UDP ]
5111 * `join_addr`: multicast address (or addresses) to join to
5112 * `join_interval`: seconds between periodic bandwidth reports
5113 * `join_intf`: the interface to bind the join to
5114 * `join_towards`: router whos interface to bind the join to
5115
5116 returns: Success (bool)
5117 """
5118
5119 iperf_path = self.tgen.net.get_exec_path("iperf")
5120
5121 assert join_addr
5122 if not isinstance(join_addr, list) and not isinstance(join_addr, tuple):
5123 join_addr = [ipaddress.IPv4Address(frr_unicode(join_addr))]
5124
5125 for bindTo in join_addr:
5126 iperf_args = [iperf_path, "-s"]
5127
5128 if l4Type == "UDP":
5129 iperf_args.append("-u")
5130
5131 iperf_args.append("-B")
5132 if join_towards:
5133 to_intf = frr_unicode(
5134 self.tgen.json_topo["routers"][host]["links"][join_towards][
5135 "interface"
5136 ]
5137 )
5138 iperf_args.append("{}%{}".format(str(bindTo), to_intf))
5139 elif join_intf:
5140 iperf_args.append("{}%{}".format(str(bindTo), join_intf))
5141 else:
5142 iperf_args.append(str(bindTo))
5143
5144 if join_interval:
5145 iperf_args.append("-i")
5146 iperf_args.append(str(join_interval))
5147
5148 p = self.run(host, iperf_args)
5149 if p.poll() is not None:
5150 logger.error("IGMP join failed on %s: %s", bindTo, comm_error(p))
5151 return False
5152 return True
5153
5154 def run_traffic(
5155 self, host, sentToAddress, ttl, time=0, l4Type="UDP", bind_towards=None
5156 ):
5157 """
5158 Run iperf to send IGMP join and traffic
5159
5160 Parameters:
5161 -----------
5162 * `host`: iperf host to send traffic from
5163 * `l4Type`: string, one of [ TCP, UDP ]
5164 * `sentToAddress`: multicast address to send traffic to
5165 * `ttl`: time to live
5166 * `time`: time in seconds to transmit for
5167 * `bind_towards`: Router who's interface the source ip address is got from
5168
5169 returns: Success (bool)
5170 """
5171
5172 iperf_path = self.tgen.net.get_exec_path("iperf")
5173
5174 if sentToAddress and not isinstance(sentToAddress, list):
5175 sentToAddress = [ipaddress.IPv4Address(frr_unicode(sentToAddress))]
5176
5177 for sendTo in sentToAddress:
5178 iperf_args = [iperf_path, "-c", sendTo]
5179
5180 # Bind to Interface IP
5181 if bind_towards:
5182 ifaddr = frr_unicode(
5183 self.tgen.json_topo["routers"][host]["links"][bind_towards]["ipv4"]
5184 )
5185 ipaddr = ipaddress.IPv4Interface(ifaddr).ip
5186 iperf_args.append("-B")
5187 iperf_args.append(str(ipaddr))
5188
5189 # UDP/TCP
5190 if l4Type == "UDP":
5191 iperf_args.append("-u")
5192 iperf_args.append("-b")
5193 iperf_args.append("0.012m")
5194
5195 # TTL
5196 if ttl:
5197 iperf_args.append("-T")
5198 iperf_args.append(str(ttl))
5199
5200 # Time
5201 if time:
5202 iperf_args.append("-t")
5203 iperf_args.append(str(time))
5204
5205 p = self.run(host, iperf_args)
5206 if p.poll() is not None:
5207 logger.error(
5208 "mcast traffic send failed for %s: %s", sendTo, comm_error(p)
5209 )
5210 return False
5211
5212 return True
5213
5214
5215 def verify_ip_nht(tgen, input_dict):
5216 """
5217 Running "show ip nht" command and verifying given nexthop resolution
5218 Parameters
5219 ----------
5220 * `tgen` : topogen object
5221 * `input_dict`: data to verify nexthop
5222 Usage
5223 -----
5224 input_dict_4 = {
5225 "r1": {
5226 nh: {
5227 "Address": nh,
5228 "resolvedVia": "connected",
5229 "nexthops": {
5230 "nexthop1": {
5231 "Interface": intf
5232 }
5233 }
5234 }
5235 }
5236 }
5237 result = verify_ip_nht(tgen, input_dict_4)
5238 Returns
5239 -------
5240 errormsg(str) or True
5241 """
5242
5243 logger.debug("Entering lib API: verify_ip_nht()")
5244
5245 router_list = tgen.routers()
5246 for router in input_dict.keys():
5247 if router not in router_list:
5248 continue
5249
5250 rnode = router_list[router]
5251 nh_list = input_dict[router]
5252
5253 if validate_ip_address(next(iter(nh_list))) == "ipv6":
5254 show_ip_nht = run_frr_cmd(rnode, "show ipv6 nht")
5255 else:
5256 show_ip_nht = run_frr_cmd(rnode, "show ip nht")
5257
5258 for nh in nh_list:
5259 if nh in show_ip_nht:
5260 nht = run_frr_cmd(rnode, "show ip nht {}".format(nh))
5261 if "unresolved" in nht:
5262 errormsg = "Nexthop {} became unresolved on {}".format(nh, router)
5263 return errormsg
5264 else:
5265 logger.info("Nexthop %s is resolved on %s", nh, router)
5266 return True
5267 else:
5268 errormsg = "Nexthop {} is resolved on {}".format(nh, router)
5269 return errormsg
5270
5271 logger.debug("Exiting lib API: verify_ip_nht()")
5272 return False
5273
5274
5275 def scapy_send_raw_packet(tgen, topo, senderRouter, intf, packet=None):
5276 """
5277 Using scapy Raw() method to send BSR raw packet from one FRR
5278 to other
5279
5280 Parameters:
5281 -----------
5282 * `tgen` : Topogen object
5283 * `topo` : json file data
5284 * `senderRouter` : Sender router
5285 * `packet` : packet in raw format
5286
5287 returns:
5288 --------
5289 errormsg or True
5290 """
5291
5292 global CD
5293 result = ""
5294 logger.debug("Entering lib API: {}".format(sys._getframe().f_code.co_name))
5295 sender_interface = intf
5296 rnode = tgen.routers()[senderRouter]
5297
5298 for destLink, data in topo["routers"][senderRouter]["links"].items():
5299 if "type" in data and data["type"] == "loopback":
5300 continue
5301
5302 if not packet:
5303 packet = topo["routers"][senderRouter]["pkt"]["test_packets"][packet][
5304 "data"
5305 ]
5306
5307 python3_path = tgen.net.get_exec_path(["python3", "python"])
5308 script_path = os.path.join(CD, "send_bsr_packet.py")
5309 cmd = "{} {} '{}' '{}' --interval=1 --count=1".format(
5310 python3_path, script_path, packet, sender_interface
5311 )
5312
5313 logger.info("Scapy cmd: \n %s", cmd)
5314 result = rnode.run(cmd)
5315
5316 if result == "":
5317 return result
5318
5319 logger.debug("Exiting lib API: {}".format(sys._getframe().f_code.co_name))
5320 return True