1 # SPDX-License-Identifier: ISC
3 # Copyright (c) 2019 by VMware, Inc. ("VMware")
4 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
5 # ("NetDEF") in this file.
11 from copy
import deepcopy
12 from time
import sleep
14 # Import common_config to use commomnly used APIs
15 from lib
.common_config
import (
16 create_common_configurations
,
21 find_interface_with_greater_ip
,
23 get_frr_ipv6_linklocal
,
28 from lib
.topogen
import get_topogen
29 from lib
.topolog
import logger
30 from lib
.topotest
import frr_unicode
32 from lib
import topotest
35 def create_router_bgp(tgen
, topo
=None, input_dict
=None, build
=False, load_config
=True):
37 API to configure bgp on router
41 * `tgen` : Topogen object
42 * `topo` : json file data
43 * `input_dict` : Input dict data, required when configuring from testcase
44 * `build` : Only for initial setup phase this is set as True.
52 "router_id": "22.22.22.22",
54 "graceful-restart": True,
55 "preserve-fw-state": True,
59 "select-defer-time": 30,
72 "redist_type": "static",
77 {"redist_type": "connected"}
79 "advertise_networks": [
81 "network": "20.0.0.0/32",
85 "network": "30.0.0.0/32",
96 "number_occurences": 2
105 "name": "RMAP_MED_R3",
108 "next_hop_self": True
110 "r1": {"graceful-restart-helper": True}
126 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
130 topo
= tgen
.json_topo
132 # Flag is used when testing ipv6 over ipv4 or vice-versa
136 input_dict
= deepcopy(topo
)
138 topo
= topo
["routers"]
139 input_dict
= deepcopy(input_dict
)
141 config_data_dict
= {}
143 for router
in input_dict
.keys():
144 if "bgp" not in input_dict
[router
]:
145 logger
.debug("Router %s: 'bgp' not present in input_dict", router
)
148 bgp_data_list
= input_dict
[router
]["bgp"]
150 if type(bgp_data_list
) is not list:
151 bgp_data_list
= [bgp_data_list
]
155 for bgp_data
in bgp_data_list
:
156 data_all_bgp
= __create_bgp_global(tgen
, bgp_data
, router
, build
)
158 bgp_addr_data
= bgp_data
.setdefault("address_family", {})
160 if not bgp_addr_data
:
162 "Router %s: 'address_family' not present in "
163 "input_dict for BGP",
167 ipv4_data
= bgp_addr_data
.setdefault("ipv4", {})
168 ipv6_data
= bgp_addr_data
.setdefault("ipv6", {})
169 l2vpn_data
= bgp_addr_data
.setdefault("l2vpn", {})
173 if ipv4_data
.setdefault("unicast", {})
174 or ipv6_data
.setdefault("unicast", {})
178 l2vpn_evpn
= True if l2vpn_data
.setdefault("evpn", {}) else False
181 data_all_bgp
= __create_bgp_unicast_neighbor(
187 config_data
=data_all_bgp
,
191 data_all_bgp
= __create_l2vpn_evpn_address_family(
192 tgen
, topo
, bgp_data
, router
, config_data
=data_all_bgp
195 config_data
.extend(data_all_bgp
)
198 config_data_dict
[router
] = config_data
201 result
= create_common_configurations(
202 tgen
, config_data_dict
, "bgp", build
, load_config
204 except InvalidCLIError
:
205 logger
.error("create_router_bgp", exc_info
=True)
208 logger
.debug("Exiting lib API: create_router_bgp()")
212 def __create_bgp_global(tgen
, input_dict
, router
, build
=False):
214 Helper API to create bgp global configuration.
218 * `tgen` : Topogen object
219 * `input_dict` : Input dict data, required when configuring from testcase
220 * `router` : router id to be configured.
221 * `build` : Only for initial setup phase this is set as True.
225 list of config commands
229 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
231 bgp_data
= input_dict
232 del_bgp_action
= bgp_data
.setdefault("delete", False)
236 if "local_as" not in bgp_data
and build
:
238 "Router %s: 'local_as' not present in input_dict" "for BGP", router
242 local_as
= bgp_data
.setdefault("local_as", "")
243 cmd
= "router bgp {}".format(local_as
)
244 vrf_id
= bgp_data
.setdefault("vrf", None)
246 cmd
= "{} vrf {}".format(cmd
, vrf_id
)
249 cmd
= "no {}".format(cmd
)
250 config_data
.append(cmd
)
254 config_data
.append(cmd
)
255 config_data
.append("no bgp ebgp-requires-policy")
257 router_id
= bgp_data
.setdefault("router_id", None)
258 del_router_id
= bgp_data
.setdefault("del_router_id", False)
260 config_data
.append("no bgp router-id")
262 config_data
.append("bgp router-id {}".format(router_id
))
264 config_data
.append("bgp log-neighbor-changes")
265 config_data
.append("no bgp network import-check")
266 bgp_peer_grp_data
= bgp_data
.setdefault("peer-group", {})
268 if "peer-group" in bgp_data
and bgp_peer_grp_data
:
269 peer_grp_data
= __create_bgp_peer_group(tgen
, bgp_peer_grp_data
, router
)
270 config_data
.extend(peer_grp_data
)
272 bst_path
= bgp_data
.setdefault("bestpath", None)
274 if "aspath" in bst_path
:
275 if "delete" in bst_path
:
277 "no bgp bestpath as-path {}".format(bst_path
["aspath"])
280 config_data
.append("bgp bestpath as-path {}".format(bst_path
["aspath"]))
282 if "graceful-restart" in bgp_data
:
283 graceful_config
= bgp_data
["graceful-restart"]
285 graceful_restart
= graceful_config
.setdefault("graceful-restart", None)
287 graceful_restart_disable
= graceful_config
.setdefault(
288 "graceful-restart-disable", None
291 preserve_fw_state
= graceful_config
.setdefault("preserve-fw-state", None)
293 disable_eor
= graceful_config
.setdefault("disable-eor", None)
295 if graceful_restart
== False:
296 cmd
= "no bgp graceful-restart"
298 cmd
= "bgp graceful-restart"
300 if graceful_restart
is not None:
301 config_data
.append(cmd
)
303 if graceful_restart_disable
== False:
304 cmd
= "no bgp graceful-restart-disable"
305 if graceful_restart_disable
:
306 cmd
= "bgp graceful-restart-disable"
308 if graceful_restart_disable
is not None:
309 config_data
.append(cmd
)
311 if preserve_fw_state
== False:
312 cmd
= "no bgp graceful-restart preserve-fw-state"
313 if preserve_fw_state
:
314 cmd
= "bgp graceful-restart preserve-fw-state"
316 if preserve_fw_state
is not None:
317 config_data
.append(cmd
)
319 if disable_eor
== False:
320 cmd
= "no bgp graceful-restart disable-eor"
322 cmd
= "bgp graceful-restart disable-eor"
324 if disable_eor
is not None:
325 config_data
.append(cmd
)
327 if "timer" in bgp_data
["graceful-restart"]:
328 timer
= bgp_data
["graceful-restart"]["timer"]
330 if "delete" in timer
:
331 del_action
= timer
["delete"]
335 for rs_timer
, value
in timer
.items():
336 rs_timer_value
= timer
.setdefault(rs_timer
, None)
338 if rs_timer_value
and rs_timer
!= "delete":
339 cmd
= "bgp graceful-restart {} {}".format(rs_timer
, rs_timer_value
)
342 cmd
= "no {}".format(cmd
)
344 config_data
.append(cmd
)
346 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
350 def __create_bgp_unicast_neighbor(
351 tgen
, topo
, input_dict
, router
, afi_test
, config_data
=None
354 Helper API to create configuration for address-family unicast
358 * `tgen` : Topogen object
359 * `topo` : json file data
360 * `input_dict` : Input dict data, required when configuring from testcase
361 * `router` : router id to be configured.
362 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
363 * `build` : Only for initial setup phase this is set as True.
367 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
370 bgp_data
= input_dict
371 if "router bgp" in config_data
:
374 bgp_data
= input_dict
["address_family"]
376 for addr_type
, addr_dict
in bgp_data
.items():
380 if not check_address_types(addr_type
) and not afi_test
:
383 addr_data
= addr_dict
["unicast"]
385 config_data
.append("address-family {} unicast".format(addr_type
))
387 advertise_network
= addr_data
.setdefault("advertise_networks", [])
388 for advertise_network_dict
in advertise_network
:
389 network
= advertise_network_dict
["network"]
390 if type(network
) is not list:
393 if "no_of_network" in advertise_network_dict
:
394 no_of_network
= advertise_network_dict
["no_of_network"]
398 del_action
= advertise_network_dict
.setdefault("delete", False)
400 # Generating IPs for verification
401 network_list
= generate_ips(network
, no_of_network
)
402 for ip
in network_list
:
403 ip
= str(ipaddress
.ip_network(frr_unicode(ip
)))
405 cmd
= "network {}".format(ip
)
407 cmd
= "no {}".format(cmd
)
409 config_data
.append(cmd
)
411 import_cmd
= addr_data
.setdefault("import", {})
414 if import_cmd
["delete"]:
415 config_data
.append("no import vrf {}".format(import_cmd
["vrf"]))
417 config_data
.append("import vrf {}".format(import_cmd
["vrf"]))
419 max_paths
= addr_data
.setdefault("maximum_paths", {})
421 ibgp
= max_paths
.setdefault("ibgp", None)
422 ebgp
= max_paths
.setdefault("ebgp", None)
423 del_cmd
= max_paths
.setdefault("delete", False)
426 config_data
.append("no maximum-paths ibgp {}".format(ibgp
))
428 config_data
.append("maximum-paths ibgp {}".format(ibgp
))
431 config_data
.append("no maximum-paths {}".format(ebgp
))
433 config_data
.append("maximum-paths {}".format(ebgp
))
435 aggregate_addresses
= addr_data
.setdefault("aggregate_address", [])
436 for aggregate_address
in aggregate_addresses
:
437 network
= aggregate_address
.setdefault("network", None)
440 "Router %s: 'network' not present in " "input_dict for BGP", router
443 cmd
= "aggregate-address {}".format(network
)
445 as_set
= aggregate_address
.setdefault("as_set", False)
446 summary
= aggregate_address
.setdefault("summary", False)
447 del_action
= aggregate_address
.setdefault("delete", False)
449 cmd
= "{} as-set".format(cmd
)
451 cmd
= "{} summary".format(cmd
)
454 cmd
= "no {}".format(cmd
)
456 config_data
.append(cmd
)
458 redistribute_data
= addr_data
.setdefault("redistribute", {})
459 if redistribute_data
:
460 for redistribute
in redistribute_data
:
461 if "redist_type" not in redistribute
:
463 "Router %s: 'redist_type' not present in " "input_dict", router
466 cmd
= "redistribute {}".format(redistribute
["redist_type"])
467 redist_attr
= redistribute
.setdefault("attribute", None)
469 if type(redist_attr
) is dict:
470 for key
, value
in redist_attr
.items():
471 cmd
= "{} {} {}".format(cmd
, key
, value
)
473 cmd
= "{} {}".format(cmd
, redist_attr
)
475 del_action
= redistribute
.setdefault("delete", False)
477 cmd
= "no {}".format(cmd
)
478 config_data
.append(cmd
)
480 admin_dist_data
= addr_data
.setdefault("distance", {})
482 if len(admin_dist_data
) < 2:
484 "Router %s: pass the admin distance values for "
485 "ebgp, ibgp and local routes",
488 cmd
= "distance bgp {} {} {}".format(
489 admin_dist_data
["ebgp"],
490 admin_dist_data
["ibgp"],
491 admin_dist_data
["local"],
494 del_action
= admin_dist_data
.setdefault("delete", False)
496 cmd
= "no distance bgp"
497 config_data
.append(cmd
)
499 import_vrf_data
= addr_data
.setdefault("import", {})
501 cmd
= "import vrf {}".format(import_vrf_data
["vrf"])
503 del_action
= import_vrf_data
.setdefault("delete", False)
505 cmd
= "no {}".format(cmd
)
506 config_data
.append(cmd
)
508 if "neighbor" in addr_data
:
509 neigh_data
= __create_bgp_neighbor(
510 topo
, input_dict
, router
, addr_type
, add_neigh
512 config_data
.extend(neigh_data
)
513 # configure default originate
514 if "default_originate" in addr_data
:
515 default_originate_config
= __create_bgp_default_originate_neighbor(
516 topo
, input_dict
, router
, addr_type
, add_neigh
518 config_data
.extend(default_originate_config
)
520 for addr_type
, addr_dict
in bgp_data
.items():
521 if not addr_dict
or not check_address_types(addr_type
):
524 addr_data
= addr_dict
["unicast"]
525 if "neighbor" in addr_data
:
526 neigh_addr_data
= __create_bgp_unicast_address_family(
527 topo
, input_dict
, router
, addr_type
, add_neigh
530 config_data
.extend(neigh_addr_data
)
532 config_data
.append("exit")
533 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
537 def __create_bgp_default_originate_neighbor(
538 topo
, input_dict
, router
, addr_type
, add_neigh
=True
541 Helper API to create neighbor default - originate configuration
545 * `tgen` : Topogen object
546 * `topo` : json file data
547 * `input_dict` : Input dict data, required when configuring from testcase
548 * `router` : router id to be configured
552 logger
.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
554 bgp_data
= input_dict
["address_family"]
555 neigh_data
= bgp_data
[addr_type
]["unicast"]["default_originate"]
556 for name
, peer_dict
in neigh_data
.items():
557 nh_details
= topo
[name
]
560 if "dest-link" in neigh_data
[name
]:
561 dest_link
= neigh_data
[name
]["dest-link"]
562 neighbor_ip
= nh_details
["links"][dest_link
][addr_type
].split("/")[0]
563 elif "add_type" in neigh_data
[name
]:
564 add_type
= neigh_data
[name
]["add_type"]
565 neighbor_ip
= nh_details
["links"][add_type
][addr_type
].split("/")[0]
567 neighbor_ip
= nh_details
["links"][router
][addr_type
].split("/")[0]
569 config_data
.append("address-family {} unicast".format(addr_type
))
570 if "route_map" in peer_dict
:
571 route_map
= peer_dict
["route_map"]
572 if "delete" in peer_dict
:
573 if peer_dict
["delete"]:
575 "no neighbor {} default-originate route-map {}".format(
576 neighbor_ip
, route_map
581 " neighbor {} default-originate route-map {}".format(
582 neighbor_ip
, route_map
587 " neighbor {} default-originate route-map {}".format(
588 neighbor_ip
, route_map
593 if "delete" in peer_dict
:
594 if peer_dict
["delete"]:
596 "no neighbor {} default-originate".format(neighbor_ip
)
600 "neighbor {} default-originate".format(neighbor_ip
)
603 config_data
.append("neighbor {} default-originate".format(neighbor_ip
))
605 logger
.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
609 def __create_l2vpn_evpn_address_family(
610 tgen
, topo
, input_dict
, router
, config_data
=None
613 Helper API to create configuration for l2vpn evpn address-family
617 * `tgen` : Topogen object
618 * `topo` : json file data
619 * `input_dict` : Input dict data, required when configuring
621 * `router` : router id to be configured.
622 * `build` : Only for initial setup phase this is set as True.
627 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
629 bgp_data
= input_dict
["address_family"]
631 for family_type
, family_dict
in bgp_data
.items():
632 if family_type
!= "l2vpn":
635 family_data
= family_dict
["evpn"]
637 config_data
.append("address-family l2vpn evpn")
639 advertise_data
= family_data
.setdefault("advertise", {})
640 neighbor_data
= family_data
.setdefault("neighbor", {})
641 advertise_all_vni_data
= family_data
.setdefault("advertise-all-vni", None)
642 rd_data
= family_data
.setdefault("rd", None)
643 no_rd_data
= family_data
.setdefault("no rd", False)
644 route_target_data
= family_data
.setdefault("route-target", {})
647 for address_type
, unicast_type
in advertise_data
.items():
648 if type(unicast_type
) is dict:
649 for key
, value
in unicast_type
.items():
650 cmd
= "advertise {} {}".format(address_type
, key
)
653 route_map
= value
.setdefault("route-map", {})
654 advertise_del_action
= value
.setdefault("delete", None)
657 cmd
= "{} route-map {}".format(cmd
, route_map
)
659 if advertise_del_action
:
660 cmd
= "no {}".format(cmd
)
662 config_data
.append(cmd
)
665 for neighbor
, neighbor_data
in neighbor_data
.items():
666 ipv4_neighbor
= neighbor_data
.setdefault("ipv4", {})
667 ipv6_neighbor
= neighbor_data
.setdefault("ipv6", {})
670 for neighbor_name
, action
in ipv4_neighbor
.items():
671 neighbor_ip
= topo
[neighbor
]["links"][neighbor_name
][
675 if type(action
) is dict:
676 next_hop_self
= action
.setdefault("next_hop_self", None)
677 route_maps
= action
.setdefault("route_maps", {})
679 if next_hop_self
is not None:
680 if next_hop_self
is True:
683 "next-hop-self".format(neighbor_ip
)
685 elif next_hop_self
is False:
688 "next-hop-self".format(neighbor_ip
)
692 for route_map
in route_maps
:
693 name
= route_map
.setdefault("name", {})
694 direction
= route_map
.setdefault("direction", "in")
695 del_action
= route_map
.setdefault("delete", False)
701 "input_dict for BGP "
702 "neighbor route name",
706 cmd
= "neighbor {} route-map {} " "{}".format(
707 neighbor_ip
, name
, direction
711 cmd
= "no {}".format(cmd
)
713 config_data
.append(cmd
)
716 if action
== "activate":
717 cmd
= "neighbor {} activate".format(neighbor_ip
)
718 elif action
== "deactivate":
719 cmd
= "no neighbor {} activate".format(neighbor_ip
)
721 config_data
.append(cmd
)
724 for neighbor_name
, action
in ipv4_neighbor
.items():
725 neighbor_ip
= topo
[neighbor
]["links"][neighbor_name
][
728 if action
== "activate":
729 cmd
= "neighbor {} activate".format(neighbor_ip
)
730 elif action
== "deactivate":
731 cmd
= "no neighbor {} activate".format(neighbor_ip
)
733 config_data
.append(cmd
)
735 if advertise_all_vni_data
== True:
736 cmd
= "advertise-all-vni"
737 config_data
.append(cmd
)
738 elif advertise_all_vni_data
== False:
739 cmd
= "no advertise-all-vni"
740 config_data
.append(cmd
)
743 cmd
= "rd {}".format(rd_data
)
744 config_data
.append(cmd
)
747 cmd
= "no rd {}".format(no_rd_data
)
748 config_data
.append(cmd
)
750 if route_target_data
:
751 for rt_type
, rt_dict
in route_target_data
.items():
752 for _rt_dict
in rt_dict
:
753 rt_value
= _rt_dict
.setdefault("value", None)
754 del_rt
= _rt_dict
.setdefault("delete", None)
757 cmd
= "route-target {} {}".format(rt_type
, rt_value
)
759 cmd
= "no {}".format(cmd
)
761 config_data
.append(cmd
)
763 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
768 def __create_bgp_peer_group(topo
, input_dict
, router
):
770 Helper API to create neighbor specific configuration
774 * `topo` : json file data
775 * `input_dict` : Input dict data, required when configuring from testcase
776 * `router` : router id to be configured
779 logger
.debug("Entering lib API: __create_bgp_peer_group()")
781 for grp
, grp_dict
in input_dict
.items():
782 config_data
.append("neighbor {} peer-group".format(grp
))
783 neigh_cxt
= "neighbor {} ".format(grp
)
784 update_source
= grp_dict
.setdefault("update-source", None)
785 remote_as
= grp_dict
.setdefault("remote-as", None)
786 capability
= grp_dict
.setdefault("capability", None)
788 config_data
.append("{} update-source {}".format(neigh_cxt
, update_source
))
791 config_data
.append("{} remote-as {}".format(neigh_cxt
, remote_as
))
794 config_data
.append("{} capability {}".format(neigh_cxt
, capability
))
796 logger
.debug("Exiting lib API: __create_bgp_peer_group()")
800 def __create_bgp_neighbor(topo
, input_dict
, router
, addr_type
, add_neigh
=True):
802 Helper API to create neighbor specific configuration
806 * `tgen` : Topogen object
807 * `topo` : json file data
808 * `input_dict` : Input dict data, required when configuring from testcase
809 * `router` : router id to be configured
812 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
814 bgp_data
= input_dict
["address_family"]
815 neigh_data
= bgp_data
[addr_type
]["unicast"]["neighbor"]
816 global_connect
= input_dict
.get("connecttimer", 5)
818 for name
, peer_dict
in neigh_data
.items():
820 for dest_link
, peer
in peer_dict
["dest_link"].items():
821 local_asn
= peer
.setdefault("local_asn", {})
823 local_as
= local_asn
.setdefault("local_as", 0)
824 remote_as
= local_asn
.setdefault("remote_as", 0)
825 no_prepend
= local_asn
.setdefault("no_prepend", False)
826 replace_as
= local_asn
.setdefault("replace_as", False)
827 if local_as
== remote_as
:
828 assert False is True, (
829 " Configuration Error : Router must not have "
830 "same AS-NUMBER as Local AS NUMBER"
832 nh_details
= topo
[name
]
834 if "vrfs" in topo
[router
] or type(nh_details
["bgp"]) is list:
835 for vrf_data
in nh_details
["bgp"]:
836 if "vrf" in nh_details
["links"][dest_link
] and "vrf" in vrf_data
:
837 if nh_details
["links"][dest_link
]["vrf"] == vrf_data
["vrf"]:
839 remote_as
= vrf_data
["local_as"]
842 if "vrf" not in vrf_data
:
844 remote_as
= vrf_data
["local_as"]
848 remote_as
= nh_details
["bgp"]["local_as"]
852 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
853 ip_addr
= nh_details
["links"][dest_link
]["peer-interface"]
854 elif "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
855 intf
= topo
[name
]["links"][dest_link
]["interface"]
856 ip_addr
= get_frr_ipv6_linklocal(tgen
, name
, intf
)
857 elif dest_link
in nh_details
["links"].keys():
859 ip_addr
= nh_details
["links"][dest_link
][addr_type
].split("/")[0]
861 intf
= topo
[name
]["links"][dest_link
]["interface"]
862 ip_addr
= get_frr_ipv6_linklocal(tgen
, name
, intf
)
863 if "delete" in peer
and peer
["delete"]:
864 neigh_cxt
= "no neighbor {}".format(ip_addr
)
865 config_data
.append("{}".format(neigh_cxt
))
868 neigh_cxt
= "neighbor {}".format(ip_addr
)
870 if "peer-group" in peer
:
872 "neighbor {} interface peer-group {}".format(
873 ip_addr
, peer
["peer-group"]
878 if "source_link" in peer
:
879 if peer
["source_link"] == "lo":
880 update_source
= topo
[router
]["links"]["lo"][addr_type
].split("/")[0]
882 update_source
= topo
[router
]["links"][peer
["source_link"]][
885 if "peer-group" not in peer
:
886 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
888 "{} interface remote-as {}".format(neigh_cxt
, remote_as
)
891 config_data
.append("{} remote-as {}".format(neigh_cxt
, remote_as
))
893 if local_asn
and local_as
:
894 cmd
= "{} local-as {}".format(neigh_cxt
, local_as
)
896 cmd
= "{} no-prepend".format(cmd
)
898 cmd
= "{} replace-as".format(cmd
)
899 config_data
.append("{}".format(cmd
))
901 if addr_type
== "ipv6":
902 config_data
.append("address-family ipv6 unicast")
903 config_data
.append("{} activate".format(neigh_cxt
))
905 if "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
907 "{} update-source {}".format(
908 neigh_cxt
, nh_details
["links"][dest_link
]["peer-interface"]
912 "{} interface {}".format(
913 neigh_cxt
, nh_details
["links"][dest_link
]["peer-interface"]
917 disable_connected
= peer
.setdefault("disable_connected_check", False)
918 connect
= peer
.get("connecttimer", global_connect
)
919 keep_alive
= peer
.setdefault("keepalivetimer", 3)
920 hold_down
= peer
.setdefault("holddowntimer", 10)
921 password
= peer
.setdefault("password", None)
922 no_password
= peer
.setdefault("no_password", None)
923 capability
= peer
.setdefault("capability", None)
924 max_hop_limit
= peer
.setdefault("ebgp_multihop", 1)
926 graceful_restart
= peer
.setdefault("graceful-restart", None)
927 graceful_restart_helper
= peer
.setdefault("graceful-restart-helper", None)
928 graceful_restart_disable
= peer
.setdefault("graceful-restart-disable", None)
930 config_data
.append("{} capability {}".format(neigh_cxt
, capability
))
934 "{} update-source {}".format(neigh_cxt
, update_source
)
936 if disable_connected
:
938 "{} disable-connected-check".format(disable_connected
)
942 "{} update-source {}".format(neigh_cxt
, update_source
)
944 if int(keep_alive
) != 60 and int(hold_down
) != 180:
946 "{} timers {} {}".format(neigh_cxt
, keep_alive
, hold_down
)
948 if int(connect
) != 120:
949 config_data
.append("{} timers connect {}".format(neigh_cxt
, connect
))
952 config_data
.append("{} graceful-restart".format(neigh_cxt
))
953 elif graceful_restart
== False:
954 config_data
.append("no {} graceful-restart".format(neigh_cxt
))
956 if graceful_restart_helper
:
957 config_data
.append("{} graceful-restart-helper".format(neigh_cxt
))
958 elif graceful_restart_helper
== False:
959 config_data
.append("no {} graceful-restart-helper".format(neigh_cxt
))
961 if graceful_restart_disable
:
962 config_data
.append("{} graceful-restart-disable".format(neigh_cxt
))
963 elif graceful_restart_disable
== False:
964 config_data
.append("no {} graceful-restart-disable".format(neigh_cxt
))
967 config_data
.append("{} password {}".format(neigh_cxt
, password
))
970 config_data
.append("no {} password {}".format(neigh_cxt
, no_password
))
972 if max_hop_limit
> 1:
974 "{} ebgp-multihop {}".format(neigh_cxt
, max_hop_limit
)
976 config_data
.append("{} enforce-multihop".format(neigh_cxt
))
978 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
982 def __create_bgp_unicast_address_family(
983 topo
, input_dict
, router
, addr_type
, add_neigh
=True
986 API prints bgp global config to bgp_json file.
990 * `bgp_cfg` : BGP class variables have BGP config saved in it for
992 * `local_as_no` : Local as number
993 * `router_id` : Router-id
994 * `ecmp_path` : ECMP max path
995 * `gr_enable` : BGP global gracefull restart config
999 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1000 tgen
= get_topogen()
1001 bgp_data
= input_dict
["address_family"]
1002 neigh_data
= bgp_data
[addr_type
]["unicast"]["neighbor"]
1004 for peer_name
, peer_dict
in deepcopy(neigh_data
).items():
1005 for dest_link
, peer
in peer_dict
["dest_link"].items():
1008 nh_details
= topo
[peer_name
]
1009 activate_addr_family
= peer
.setdefault("activate", None)
1010 deactivate_addr_family
= peer
.setdefault("deactivate", None)
1011 # Loopback interface
1012 if "source_link" in peer
and peer
["source_link"] == "lo":
1013 for destRouterLink
, data
in sorted(nh_details
["links"].items()):
1014 if "type" in data
and data
["type"] == "loopback":
1015 if dest_link
== destRouterLink
:
1017 nh_details
["links"][destRouterLink
][addr_type
]
1022 # Physical interface
1024 # check the neighbor type if un numbered nbr, use interface.
1025 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
1026 ip_addr
= nh_details
["links"][dest_link
]["peer-interface"]
1027 elif "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
1028 intf
= topo
[peer_name
]["links"][dest_link
]["interface"]
1029 ip_addr
= get_frr_ipv6_linklocal(tgen
, peer_name
, intf
)
1030 elif dest_link
in nh_details
["links"].keys():
1032 ip_addr
= nh_details
["links"][dest_link
][addr_type
].split("/")[
1036 intf
= topo
[peer_name
]["links"][dest_link
]["interface"]
1037 ip_addr
= get_frr_ipv6_linklocal(tgen
, peer_name
, intf
)
1040 and bgp_data
["ipv6"]
1041 and check_address_types("ipv6")
1042 and "ipv6" in nh_details
["links"][dest_link
]
1044 deactivate
= nh_details
["links"][dest_link
]["ipv6"].split("/")[
1048 neigh_cxt
= "neighbor {}".format(ip_addr
)
1049 config_data
.append("address-family {} unicast".format(addr_type
))
1051 if activate_addr_family
is not None:
1053 "address-family {} unicast".format(activate_addr_family
)
1056 config_data
.append("{} activate".format(neigh_cxt
))
1058 if deactivate
and activate_addr_family
is None:
1059 config_data
.append("no neighbor {} activate".format(deactivate
))
1061 if deactivate_addr_family
is not None:
1063 "address-family {} unicast".format(deactivate_addr_family
)
1065 config_data
.append("no {} activate".format(neigh_cxt
))
1067 next_hop_self
= peer
.setdefault("next_hop_self", None)
1068 send_community
= peer
.setdefault("send_community", None)
1069 prefix_lists
= peer
.setdefault("prefix_lists", {})
1070 route_maps
= peer
.setdefault("route_maps", {})
1071 no_send_community
= peer
.setdefault("no_send_community", None)
1072 capability
= peer
.setdefault("capability", None)
1073 allowas_in
= peer
.setdefault("allowas-in", None)
1076 if next_hop_self
is not None:
1077 if next_hop_self
is True:
1078 config_data
.append("{} next-hop-self".format(neigh_cxt
))
1080 config_data
.append("no {} next-hop-self".format(neigh_cxt
))
1084 config_data
.append("{} send-community".format(neigh_cxt
))
1087 if no_send_community
:
1089 "no {} send-community {}".format(neigh_cxt
, no_send_community
)
1093 if capability
and addr_type
== "ipv6":
1094 config_data
.append("address-family ipv4 unicast")
1095 config_data
.append("{} activate".format(neigh_cxt
))
1097 if "allowas_in" in peer
:
1098 allow_as_in
= peer
["allowas_in"]
1099 config_data
.append("{} allowas-in {}".format(neigh_cxt
, allow_as_in
))
1101 if "no_allowas_in" in peer
:
1102 allow_as_in
= peer
["no_allowas_in"]
1103 config_data
.append("no {} allowas-in {}".format(neigh_cxt
, allow_as_in
))
1105 if "shutdown" in peer
:
1107 "{} {} shutdown".format(
1108 "no" if not peer
["shutdown"] else "", neigh_cxt
1113 for prefix_list
in prefix_lists
:
1114 name
= prefix_list
.setdefault("name", {})
1115 direction
= prefix_list
.setdefault("direction", "in")
1116 del_action
= prefix_list
.setdefault("delete", False)
1119 "Router %s: 'name' not present in "
1120 "input_dict for BGP neighbor prefix lists",
1124 cmd
= "{} prefix-list {} {}".format(neigh_cxt
, name
, direction
)
1126 cmd
= "no {}".format(cmd
)
1127 config_data
.append(cmd
)
1130 for route_map
in route_maps
:
1131 name
= route_map
.setdefault("name", {})
1132 direction
= route_map
.setdefault("direction", "in")
1133 del_action
= route_map
.setdefault("delete", False)
1136 "Router %s: 'name' not present in "
1137 "input_dict for BGP neighbor route name",
1141 cmd
= "{} route-map {} {}".format(neigh_cxt
, name
, direction
)
1143 cmd
= "no {}".format(cmd
)
1144 config_data
.append(cmd
)
1147 number_occurences
= allowas_in
.setdefault("number_occurences", {})
1148 del_action
= allowas_in
.setdefault("delete", False)
1150 cmd
= "{} allowas-in {}".format(neigh_cxt
, number_occurences
)
1153 cmd
= "no {}".format(cmd
)
1155 config_data
.append(cmd
)
1160 def modify_bgp_config_when_bgpd_down(tgen
, topo
, input_dict
):
1162 API will save the current config to router's /etc/frr/ for BGPd
1163 daemon(bgpd.conf file)
1167 * `tgen` : Topogen object
1168 * `topo` : json file data
1169 * `input_dict` : defines for which router, and which config
1170 needs to be modified
1174 # Modify graceful-restart config not to set f-bit
1175 # and write to /etc/frr
1177 # Api call to delete advertised networks
1184 "advertise_networks": [
1186 "network": "101.0.20.1/32",
1195 "advertise_networks": [
1197 "network": "5::1/128",
1209 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
1213 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1215 result
= create_router_bgp(
1216 tgen
, topo
, input_dict
, build
=False, load_config
=False
1218 if result
is not True:
1221 # Copy bgp config file to /etc/frr
1222 for dut
in input_dict
.keys():
1223 router_list
= tgen
.routers()
1224 for router
, rnode
in router_list
.items():
1228 logger
.info("Delete BGP config when BGPd is down in {}".format(router
))
1229 # Reading the config from "rundir" and copy to /etc/frr/bgpd.conf
1230 cmd
= "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
1231 tgen
.logdir
, router
, FRRCFG_FILE
1233 router_list
[router
].run(cmd
)
1235 except Exception as e
:
1236 errormsg
= traceback
.format_exc()
1237 logger
.error(errormsg
)
1240 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1244 #############################################
1246 #############################################
1247 @retry(retry_timeout
=8)
1248 def verify_router_id(tgen
, topo
, input_dict
, expected
=True):
1250 Running command "show ip bgp json" for DUT and reading router-id
1251 from input_dict and verifying with command output.
1252 1. Statically modfified router-id should take place
1253 2. When static router-id is deleted highest loopback should
1255 3. When loopback intf is down then highest physcial intf
1256 should become router-id
1260 * `tgen`: topogen object
1261 * `topo`: input json file data
1262 * `input_dict`: input dictionary, have details of Device Under Test, for
1263 which user wants to test the data
1264 * `expected` : expected results from API, by-default True
1268 # Verify if router-id for r1 is 12.12.12.12
1271 "router_id": "12.12.12.12"
1273 # Verify that router-id for r1 is highest interface ip
1277 result = verify_router_id(tgen, topo, input_dict)
1281 errormsg(str) or True
1284 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1285 for router
in input_dict
.keys():
1286 if router
not in tgen
.routers():
1289 rnode
= tgen
.routers()[router
]
1291 del_router_id
= input_dict
[router
]["bgp"].setdefault("del_router_id", False)
1293 logger
.info("Checking router %s router-id", router
)
1294 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
1295 router_id_out
= show_bgp_json
["ipv4Unicast"]["routerId"]
1296 router_id_out
= ipaddress
.IPv4Address(frr_unicode(router_id_out
))
1298 # Once router-id is deleted, highest interface ip should become
1301 router_id
= find_interface_with_greater_ip(topo
, router
)
1303 router_id
= input_dict
[router
]["bgp"]["router_id"]
1304 router_id
= ipaddress
.IPv4Address(frr_unicode(router_id
))
1306 if router_id
== router_id_out
:
1307 logger
.info("Found expected router-id %s for router %s", router_id
, router
)
1310 "Router-id for router:{} mismatch, expected:"
1311 " {} but found:{}".format(router
, router_id
, router_id_out
)
1315 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1319 @retry(retry_timeout
=150)
1320 def verify_bgp_convergence(tgen
, topo
=None, dut
=None, expected
=True):
1322 API will verify if BGP is converged with in the given time frame.
1323 Running "show bgp summary json" command and verify bgp neighbor
1324 state is established,
1328 * `tgen`: topogen object
1329 * `topo`: input json file data
1330 * `dut`: device under test
1334 # To veriry is BGP is converged for all the routers used in
1336 results = verify_bgp_convergence(tgen, topo, dut="r1")
1340 errormsg(str) or True
1344 topo
= tgen
.json_topo
1347 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1348 tgen
= get_topogen()
1349 for router
, rnode
in tgen
.routers().items():
1350 if "bgp" not in topo
["routers"][router
]:
1353 if dut
is not None and dut
!= router
:
1356 logger
.info("Verifying BGP Convergence on router %s:", router
)
1357 show_bgp_json
= run_frr_cmd(rnode
, "show bgp vrf all summary json", isjson
=True)
1358 # Verifying output dictionary show_bgp_json is empty or not
1359 if not bool(show_bgp_json
):
1360 errormsg
= "BGP is not running"
1363 # To find neighbor ip type
1364 bgp_data_list
= topo
["routers"][router
]["bgp"]
1366 if type(bgp_data_list
) is not list:
1367 bgp_data_list
= [bgp_data_list
]
1369 for bgp_data
in bgp_data_list
:
1370 if "vrf" in bgp_data
:
1371 vrf
= bgp_data
["vrf"]
1377 # To find neighbor ip type
1378 bgp_addr_type
= bgp_data
["address_family"]
1379 if "l2vpn" in bgp_addr_type
:
1382 if "neighbor" not in bgp_addr_type
["l2vpn"]["evpn"]:
1385 bgp_neighbors
= bgp_addr_type
["l2vpn"]["evpn"]["neighbor"]
1386 total_evpn_peer
+= len(bgp_neighbors
)
1389 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1390 for _addr_type
, dest_link_dict
in peer_data
.items():
1391 data
= topo
["routers"][bgp_neighbor
]["links"]
1392 for dest_link
in dest_link_dict
.keys():
1393 if dest_link
in data
:
1394 peer_details
= peer_data
[_addr_type
][dest_link
]
1396 neighbor_ip
= data
[dest_link
][_addr_type
].split("/")[0]
1400 "ipv4Unicast" in show_bgp_json
[vrf
]
1401 or "ipv6Unicast" in show_bgp_json
[vrf
]
1404 "[DUT: %s] VRF: %s, "
1405 "ipv4Unicast/ipv6Unicast"
1406 " address-family present"
1407 " under l2vpn" % (router
, vrf
)
1411 l2VpnEvpn_data
= show_bgp_json
[vrf
]["l2VpnEvpn"][
1414 nh_state
= l2VpnEvpn_data
[neighbor_ip
]["state"]
1416 if nh_state
== "Established":
1417 no_of_evpn_peer
+= 1
1419 if no_of_evpn_peer
== total_evpn_peer
:
1421 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
1428 "[DUT: %s] VRF: %s, BGP is not converged "
1429 "for evpn peers" % (router
, vrf
)
1434 for addr_type
in bgp_addr_type
.keys():
1435 if not check_address_types(addr_type
):
1438 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1440 for bgp_neighbor
in bgp_neighbors
:
1441 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
1444 for addr_type
in bgp_addr_type
.keys():
1445 if not check_address_types(addr_type
):
1447 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1449 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1450 for dest_link
in peer_data
["dest_link"].keys():
1451 data
= topo
["routers"][bgp_neighbor
]["links"]
1452 if dest_link
in data
:
1453 peer_details
= peer_data
["dest_link"][dest_link
]
1454 # for link local neighbors
1456 "neighbor_type" in peer_details
1457 and peer_details
["neighbor_type"] == "link-local"
1459 intf
= topo
["routers"][bgp_neighbor
]["links"][
1462 neighbor_ip
= get_frr_ipv6_linklocal(
1463 tgen
, bgp_neighbor
, intf
1465 elif "source_link" in peer_details
:
1466 neighbor_ip
= topo
["routers"][bgp_neighbor
][
1468 ][peer_details
["source_link"]][addr_type
].split(
1474 "neighbor_type" in peer_details
1475 and peer_details
["neighbor_type"] == "unnumbered"
1477 neighbor_ip
= data
[dest_link
]["peer-interface"]
1479 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[
1483 neighbor_ip
= neighbor_ip
.lower()
1484 if addr_type
== "ipv4":
1485 ipv4_data
= show_bgp_json
[vrf
]["ipv4Unicast"][
1488 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1490 ipv6_data
= show_bgp_json
[vrf
]["ipv6Unicast"][
1493 if neighbor_ip
in ipv6_data
:
1494 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1496 if nh_state
== "Established":
1499 if no_of_peer
== total_peer
and no_of_peer
> 0:
1500 logger
.info("[DUT: %s] VRF: %s, BGP is Converged", router
, vrf
)
1503 errormsg
= "[DUT: %s] VRF: %s, BGP is not converged" % (router
, vrf
)
1506 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1510 @retry(retry_timeout
=16)
1511 def verify_bgp_community(
1522 API to veiryf BGP large community is attached in route for any given
1523 DUT by running "show bgp ipv4/6 {route address} json" command.
1527 * `tgen`: topogen object
1528 * `addr_type` : ip type, ipv4/ipv6
1529 * `dut`: Device Under Test
1530 * `network`: network for which set criteria needs to be verified
1531 * `input_dict`: having details like - for which router, community and
1532 values needs to be verified
1534 * `bestpath`: To check best path cli
1535 * `expected` : expected results from API, by-default True
1539 networks = ["200.50.2.0/32"]
1541 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1543 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1547 errormsg(str) or True
1550 logger
.debug("Entering lib API: verify_bgp_community()")
1551 if router
not in tgen
.routers():
1554 rnode
= tgen
.routers()[router
]
1557 "Verifying BGP community attributes on dut %s: for %s " "network %s",
1563 command
= "show bgp"
1567 cmd
= "{} vrf {} {} {} json".format(command
, vrf
, addr_type
, net
)
1569 cmd
= "{} {} {} bestpath json".format(command
, addr_type
, net
)
1571 cmd
= "{} {} {} json".format(command
, addr_type
, net
)
1573 show_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
1574 if "paths" not in show_bgp_json
:
1575 return "Prefix {} not found in BGP table of router: {}".format(net
, router
)
1577 as_paths
= show_bgp_json
["paths"]
1579 for i
in range(len(as_paths
)):
1581 "largeCommunity" in show_bgp_json
["paths"][i
]
1582 or "community" in show_bgp_json
["paths"][i
]
1586 "Large Community attribute is found for route:" " %s in router: %s",
1590 if input_dict
is not None:
1591 for criteria
, comm_val
in input_dict
.items():
1592 show_val
= show_bgp_json
["paths"][i
][criteria
]["string"]
1593 if comm_val
== show_val
:
1595 "Verifying BGP %s for prefix: %s"
1596 " in router: %s, found expected"
1605 "Failed: Verifying BGP attribute"
1606 " {} for route: {} in router: {}"
1607 ", expected value: {} but found"
1608 ": {}".format(criteria
, net
, router
, comm_val
, show_val
)
1614 "Large Community attribute is not found for route: "
1615 "{} in router: {} ".format(net
, router
)
1619 logger
.debug("Exiting lib API: verify_bgp_community()")
1623 def modify_as_number(tgen
, topo
, input_dict
):
1625 API reads local_as and remote_as from user defined input_dict and
1626 modify router"s ASNs accordingly. Router"s config is modified and
1627 recent/changed config is loadeded to router.
1631 * `tgen` : Topogen object
1632 * `topo` : json file data
1633 * `input_dict` : defines for which router ASNs needs to be modified
1637 To modify ASNs for router r1
1644 result = modify_as_number(tgen, topo, input_dict)
1648 errormsg(str) or True
1651 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1653 new_topo
= deepcopy(topo
["routers"])
1655 for router
in input_dict
.keys():
1656 # Remove bgp configuration
1658 router_dict
.update({router
: {"bgp": {"delete": True}}})
1660 new_topo
[router
]["bgp"]["local_as"] = input_dict
[router
]["bgp"][
1664 new_topo
[router
]["bgp"][0]["local_as"] = input_dict
[router
]["bgp"][
1667 logger
.info("Removing bgp configuration")
1668 create_router_bgp(tgen
, topo
, router_dict
)
1670 logger
.info("Applying modified bgp configuration")
1671 result
= create_router_bgp(tgen
, new_topo
)
1672 if result
is not True:
1673 result
= "Error applying new AS number config"
1674 except Exception as e
:
1675 errormsg
= traceback
.format_exc()
1676 logger
.error(errormsg
)
1679 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1683 @retry(retry_timeout
=8)
1684 def verify_as_numbers(tgen
, topo
, input_dict
, expected
=True):
1686 This API is to verify AS numbers for given DUT by running
1687 "show ip bgp neighbor json" command. Local AS and Remote AS
1688 will ve verified with input_dict data and command output.
1692 * `tgen`: topogen object
1693 * `topo`: input json file data
1694 * `addr_type` : ip type, ipv4/ipv6
1695 * `input_dict`: defines - for which router, AS numbers needs to be verified
1696 * `expected` : expected results from API, by-default True
1707 result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1711 errormsg(str) or True
1714 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1715 for router
in input_dict
.keys():
1716 if router
not in tgen
.routers():
1719 rnode
= tgen
.routers()[router
]
1721 logger
.info("Verifying AS numbers for dut %s:", router
)
1723 show_ip_bgp_neighbor_json
= run_frr_cmd(
1724 rnode
, "show ip bgp neighbor json", isjson
=True
1726 local_as
= input_dict
[router
]["bgp"]["local_as"]
1727 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1729 for addr_type
in bgp_addr_type
:
1730 if not check_address_types(addr_type
):
1733 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1735 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1736 remote_as
= input_dict
[bgp_neighbor
]["bgp"]["local_as"]
1737 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
1739 data
= topo
["routers"][bgp_neighbor
]["links"]
1741 if dest_link
in data
:
1742 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1743 neigh_data
= show_ip_bgp_neighbor_json
[neighbor_ip
]
1744 # Verify Local AS for router
1745 if neigh_data
["localAs"] != local_as
:
1747 "Failed: Verify local_as for dut {},"
1748 " found: {} but expected: {}".format(
1749 router
, neigh_data
["localAs"], local_as
1755 "Verified local_as for dut %s, found" " expected: %s",
1760 # Verify Remote AS for neighbor
1761 if neigh_data
["remoteAs"] != remote_as
:
1763 "Failed: Verify remote_as for dut "
1764 "{}'s neighbor {}, found: {} but "
1765 "expected: {}".format(
1766 router
, bgp_neighbor
, neigh_data
["remoteAs"], remote_as
1772 "Verified remote_as for dut %s's "
1773 "neighbor %s, found expected: %s",
1779 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1783 @retry(retry_timeout
=150)
1784 def verify_bgp_convergence_from_running_config(tgen
, dut
=None, expected
=True):
1786 API to verify BGP convergence b/w loopback and physical interface.
1787 This API would be used when routers have BGP neighborship is loopback
1788 to physical or vice-versa
1792 * `tgen`: topogen object
1793 * `dut`: device under test
1794 * `expected` : expected results from API, by-default True
1798 results = verify_bgp_convergence_bw_lo_and_phy_intf(tgen, topo,
1803 errormsg(str) or True
1806 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1808 for router
, rnode
in tgen
.routers().items():
1809 if dut
is not None and dut
!= router
:
1812 logger
.info("Verifying BGP Convergence on router %s:", router
)
1813 show_bgp_json
= run_frr_cmd(rnode
, "show bgp vrf all summary json", isjson
=True)
1814 # Verifying output dictionary show_bgp_json is empty or not
1815 if not bool(show_bgp_json
):
1816 errormsg
= "BGP is not running"
1819 for vrf
, addr_family_data
in show_bgp_json
.items():
1820 for address_family
, neighborship_data
in addr_family_data
.items():
1824 total_peer
= len(neighborship_data
["peers"].keys())
1826 for peer
, peer_data
in neighborship_data
["peers"].items():
1827 if peer_data
["state"] == "Established":
1830 if total_peer
!= no_of_peer
:
1832 "[DUT: %s] VRF: %s, BGP is not converged"
1833 " for peer: %s" % (router
, vrf
, peer
)
1837 logger
.info("[DUT: %s]: vrf: %s, BGP is Converged", router
, vrf
)
1839 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1844 def clear_bgp(tgen
, addr_type
, router
, vrf
=None, neighbor
=None):
1846 This API is to clear bgp neighborship by running
1847 clear ip bgp */clear bgp ipv6 * command,
1851 * `tgen`: topogen object
1852 * `addr_type`: ip type ipv4/ipv6
1853 * `router`: device under test
1855 * `neighbor`: Neighbor for which bgp needs to be cleared
1859 clear_bgp(tgen, addr_type, "r1")
1862 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1864 if router
not in tgen
.routers():
1867 rnode
= tgen
.routers()[router
]
1870 if type(vrf
) is not list:
1874 logger
.info("Clearing BGP neighborship for router %s..", router
)
1875 if addr_type
== "ipv4":
1878 run_frr_cmd(rnode
, "clear ip bgp vrf {} *".format(_vrf
))
1880 run_frr_cmd(rnode
, "clear bgp ipv4 {}".format(neighbor
))
1882 run_frr_cmd(rnode
, "clear ip bgp *")
1883 elif addr_type
== "ipv6":
1886 run_frr_cmd(rnode
, "clear bgp vrf {} ipv6 *".format(_vrf
))
1888 run_frr_cmd(rnode
, "clear bgp ipv6 {}".format(neighbor
))
1890 run_frr_cmd(rnode
, "clear bgp ipv6 *")
1892 run_frr_cmd(rnode
, "clear bgp *")
1894 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1897 def clear_bgp_and_verify(tgen
, topo
, router
, rid
=None):
1899 This API is to clear bgp neighborship and verify bgp neighborship
1900 is coming up(BGP is converged) usinf "show bgp summary json" command
1901 and also verifying for all bgp neighbors uptime before and after
1902 clear bgp sessions is different as the uptime must be changed once
1903 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1907 * `tgen`: topogen object
1908 * `topo`: input json file data
1909 * `router`: device under test
1913 result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1917 errormsg(str) or True
1920 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1922 if router
not in tgen
.routers():
1925 rnode
= tgen
.routers()[router
]
1927 peer_uptime_before_clear_bgp
= {}
1930 # Verifying BGP convergence before bgp clear command
1931 for retry
in range(50):
1932 # Waiting for BGP to converge
1934 "Waiting for %s sec for BGP to converge on router" " %s...",
1940 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
1941 # Verifying output dictionary show_bgp_json is empty or not
1942 if not bool(show_bgp_json
):
1943 errormsg
= "BGP is not running"
1946 # To find neighbor ip type
1948 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1950 bgp_addr_type
= topo
["routers"][router
]["bgp"][0]["address_family"]
1953 for addr_type
in bgp_addr_type
.keys():
1954 if not check_address_types(addr_type
):
1957 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1959 for bgp_neighbor
in bgp_neighbors
:
1960 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
1963 for addr_type
in bgp_addr_type
:
1964 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1966 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1967 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
1968 data
= topo
["routers"][bgp_neighbor
]["links"]
1970 if dest_link
in data
:
1971 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1972 if addr_type
== "ipv4":
1973 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
1974 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1976 # Peer up time dictionary
1977 peer_uptime_before_clear_bgp
[bgp_neighbor
] = ipv4_data
[
1979 ]["peerUptimeEstablishedEpoch"]
1981 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
1982 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1984 # Peer up time dictionary
1985 peer_uptime_before_clear_bgp
[bgp_neighbor
] = ipv6_data
[
1987 ]["peerUptimeEstablishedEpoch"]
1989 if nh_state
== "Established":
1992 if no_of_peer
== total_peer
:
1993 logger
.info("BGP is Converged for router %s before bgp" " clear", router
)
1997 "BGP is not yet Converged for router %s " "before bgp clear", router
2001 "TIMEOUT!! BGP is not converged in {} seconds for"
2002 " router {}".format(retry
* sleeptime
, router
)
2007 logger
.info("Clearing BGP neighborship for router %s..", router
)
2008 for addr_type
in bgp_addr_type
.keys():
2009 if addr_type
== "ipv4":
2011 run_frr_cmd(rnode
, "clear bgp ipv4 {}".format(rid
))
2013 run_frr_cmd(rnode
, "clear bgp ipv4 *")
2014 elif addr_type
== "ipv6":
2016 run_frr_cmd(rnode
, "clear bgp ipv6 {}".format(rid
))
2018 run_frr_cmd(rnode
, "clear bgp ipv6 *")
2019 peer_uptime_after_clear_bgp
= {}
2020 # Verifying BGP convergence after bgp clear command
2021 for retry
in range(50):
2022 # Waiting for BGP to converge
2024 "Waiting for %s sec for BGP to converge on router" " %s...",
2030 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
2031 # Verifying output dictionary show_bgp_json is empty or not
2032 if not bool(show_bgp_json
):
2033 errormsg
= "BGP is not running"
2036 # To find neighbor ip type
2038 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
2040 bgp_addr_type
= topo
["routers"][router
]["bgp"][0]["address_family"]
2043 for addr_type
in bgp_addr_type
.keys():
2044 if not check_address_types(addr_type
):
2047 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2049 for bgp_neighbor
in bgp_neighbors
:
2050 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
2053 for addr_type
in bgp_addr_type
:
2054 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2056 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2057 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2058 data
= topo
["routers"][bgp_neighbor
]["links"]
2060 if dest_link
in data
:
2061 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2062 if addr_type
== "ipv4":
2063 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2064 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2065 peer_uptime_after_clear_bgp
[bgp_neighbor
] = ipv4_data
[
2067 ]["peerUptimeEstablishedEpoch"]
2069 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2070 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2071 # Peer up time dictionary
2072 peer_uptime_after_clear_bgp
[bgp_neighbor
] = ipv6_data
[
2074 ]["peerUptimeEstablishedEpoch"]
2076 if nh_state
== "Established":
2079 if no_of_peer
== total_peer
:
2080 logger
.info("BGP is Converged for router %s after bgp clear", router
)
2084 "BGP is not yet Converged for router %s after" " bgp clear", router
2088 "TIMEOUT!! BGP is not converged in {} seconds for"
2089 " router {}".format(retry
* sleeptime
, router
)
2093 # Comparing peerUptimeEstablishedEpoch dictionaries
2094 if peer_uptime_before_clear_bgp
!= peer_uptime_after_clear_bgp
:
2095 logger
.info("BGP neighborship is reset after clear BGP on router %s", router
)
2098 "BGP neighborship is not reset after clear bgp on router"
2099 " {}".format(router
)
2103 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2107 def verify_bgp_timers_and_functionality(tgen
, topo
, input_dict
):
2109 To verify BGP timer config, execute "show ip bgp neighbor json" command
2110 and verify bgp timers with input_dict data.
2111 To veirfy bgp timers functonality, shutting down peer interface
2112 and verify BGP neighborship status.
2116 * `tgen`: topogen object
2117 * `topo`: input json file data
2118 * `addr_type`: ip type, ipv4/ipv6
2119 * `input_dict`: defines for which router, bgp timers needs to be verified
2122 # To verify BGP timers for neighbor r2 of router r1
2128 "keepalivetimer": 5,
2129 "holddowntimer": 15,
2131 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
2136 errormsg(str) or True
2139 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2141 router_list
= tgen
.routers()
2142 for router
in input_dict
.keys():
2143 if router
not in router_list
:
2146 rnode
= router_list
[router
]
2148 logger
.info("Verifying bgp timers functionality, DUT is %s:", router
)
2150 show_ip_bgp_neighbor_json
= run_frr_cmd(
2151 rnode
, "show ip bgp neighbor json", isjson
=True
2154 bgp_addr_type
= input_dict
[router
]["bgp"]["address_family"]
2156 for addr_type
in bgp_addr_type
:
2157 if not check_address_types(addr_type
):
2160 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2161 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2162 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2163 data
= topo
["routers"][bgp_neighbor
]["links"]
2165 keepalivetimer
= peer_dict
["keepalivetimer"]
2166 holddowntimer
= peer_dict
["holddowntimer"]
2168 if dest_link
in data
:
2169 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2170 neighbor_intf
= data
[dest_link
]["interface"]
2172 # Verify HoldDownTimer for neighbor
2173 bgpHoldTimeMsecs
= show_ip_bgp_neighbor_json
[neighbor_ip
][
2174 "bgpTimerHoldTimeMsecs"
2176 if bgpHoldTimeMsecs
!= holddowntimer
* 1000:
2178 "Verifying holddowntimer for bgp "
2179 "neighbor {} under dut {}, found: {} "
2180 "but expected: {}".format(
2184 holddowntimer
* 1000,
2189 # Verify KeepAliveTimer for neighbor
2190 bgpKeepAliveTimeMsecs
= show_ip_bgp_neighbor_json
[neighbor_ip
][
2191 "bgpTimerKeepAliveIntervalMsecs"
2193 if bgpKeepAliveTimeMsecs
!= keepalivetimer
* 1000:
2195 "Verifying keepalivetimer for bgp "
2196 "neighbor {} under dut {}, found: {} "
2197 "but expected: {}".format(
2200 bgpKeepAliveTimeMsecs
,
2201 keepalivetimer
* 1000,
2206 ####################
2207 # Shutting down peer interface after keepalive time and
2208 # after some time bringing up peer interface.
2209 # verifying BGP neighborship in (hold down-keep alive)
2210 # time, it should not go down
2211 ####################
2213 # Wait till keep alive time
2214 logger
.info("=" * 20)
2215 logger
.info("Scenario 1:")
2217 "Shutdown and bring up peer interface: %s "
2218 "in keep alive time : %s sec and verify "
2219 " BGP neighborship is intact in %s sec ",
2222 (holddowntimer
- keepalivetimer
),
2224 logger
.info("=" * 20)
2225 logger
.info("Waiting for %s sec..", keepalivetimer
)
2226 sleep(keepalivetimer
)
2228 # Shutting down peer ineterface
2230 "Shutting down interface %s on router %s",
2234 topotest
.interface_set_status(
2235 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=False
2238 # Bringing up peer interface
2241 "Bringing up interface %s on router %s..",
2245 topotest
.interface_set_status(
2246 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=True
2249 # Verifying BGP neighborship is intact in
2250 # (holddown - keepalive) time
2252 keepalivetimer
, holddowntimer
, int(holddowntimer
/ 3)
2254 logger
.info("Waiting for %s sec..", keepalivetimer
)
2255 sleep(keepalivetimer
)
2257 show_bgp_json
= run_frr_cmd(
2258 rnode
, "show bgp summary json", isjson
=True
2261 if addr_type
== "ipv4":
2262 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2263 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2265 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2266 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2268 if timer
== (holddowntimer
- keepalivetimer
):
2269 if nh_state
!= "Established":
2271 "BGP neighborship has not gone "
2272 "down in {} sec for neighbor {}".format(
2279 "BGP neighborship is intact in %s"
2280 " sec for neighbor %s",
2285 ####################
2286 # Shutting down peer interface and verifying that BGP
2287 # neighborship is going down in holddown time
2288 ####################
2289 logger
.info("=" * 20)
2290 logger
.info("Scenario 2:")
2292 "Shutdown peer interface: %s and verify BGP"
2293 " neighborship has gone down in hold down "
2298 logger
.info("=" * 20)
2301 "Shutting down interface %s on router %s..",
2305 topotest
.interface_set_status(
2306 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=False
2309 # Verifying BGP neighborship is going down in holddown time
2312 (holddowntimer
+ keepalivetimer
),
2313 int(holddowntimer
/ 3),
2315 logger
.info("Waiting for %s sec..", keepalivetimer
)
2316 sleep(keepalivetimer
)
2318 show_bgp_json
= run_frr_cmd(
2319 rnode
, "show bgp summary json", isjson
=True
2322 if addr_type
== "ipv4":
2323 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2324 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2326 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2327 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2329 if timer
== holddowntimer
:
2330 if nh_state
== "Established":
2332 "BGP neighborship has not gone "
2333 "down in {} sec for neighbor {}".format(
2340 "BGP neighborship has gone down in"
2341 " %s sec for neighbor %s",
2346 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2350 @retry(retry_timeout
=16)
2351 def verify_bgp_attributes(
2364 API will verify BGP attributes set by Route-map for given prefix and
2365 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
2366 in DUT to verify BGP attributes set by route-map, Set attributes
2367 values will be read from input_dict and verified with command output.
2369 * `tgen`: topogen object
2370 * `addr_type` : ip type, ipv4/ipv6
2371 * `dut`: Device Under Test
2372 * `static_routes`: Static Routes for which BGP set attributes needs to be
2374 * `rmap_name`: route map name for which set criteria needs to be verified
2375 * `input_dict`: defines for which router, AS numbers needs
2376 * `seq_id`: sequence number of rmap, default is None
2377 * `expected` : expected results from API, by-default True
2381 # To verify BGP attribute "localpref" set to 150 and "med" set to 30
2382 for prefix 10.0.20.1/32 in router r3.
2386 "rmap_match_pf_list1": [
2389 "match": {"prefix_list": "pf_list_1"},
2390 "set": {"localpref": 150, "med": 30}
2394 "as_path": "500 400"
2397 static_routes (list) = ["10.0.20.1/32"]
2403 errormsg(str) or True
2406 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2407 for router
, rnode
in tgen
.routers().items():
2411 logger
.info("Verifying BGP set attributes for dut {}:".format(router
))
2413 for static_route
in static_routes
:
2415 cmd
= "show bgp vrf {} {} {} json".format(vrf
, addr_type
, static_route
)
2417 cmd
= "show bgp {} {} json".format(addr_type
, static_route
)
2418 show_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2423 dict_list
= list(input_dict
.values())[0]
2425 if "route_maps" in dict_list
:
2426 for rmap_router
in input_dict
.keys():
2427 for rmap
, values
in input_dict
[rmap_router
]["route_maps"].items():
2428 if rmap
== rmap_name
:
2429 dict_to_test
= values
2430 for rmap_dict
in values
:
2431 if seq_id
is not None:
2432 if type(seq_id
) is not list:
2435 if "seq_id" in rmap_dict
:
2436 rmap_seq_id
= rmap_dict
["seq_id"]
2437 for _seq_id
in seq_id
:
2438 if _seq_id
== rmap_seq_id
:
2439 tmp_list
.append(rmap_dict
)
2441 dict_to_test
= tmp_list
2444 for rmap_dict
in dict_to_test
:
2445 if "set" in rmap_dict
:
2446 for criteria
in rmap_dict
["set"].keys():
2448 for path
in show_bgp_json
["paths"]:
2449 if criteria
not in path
:
2452 if criteria
== "aspath":
2453 value
= path
[criteria
]["string"]
2455 value
= path
[criteria
]
2457 if rmap_dict
["set"][criteria
] == value
:
2476 "Failed: Verifying BGP "
2477 "attribute {} for route:"
2478 " {} in router: {}, "
2479 " expected value: {} but"
2480 " found: {}".format(
2484 rmap_dict
["set"][criteria
],
2490 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2494 @retry(retry_timeout
=8)
2495 def verify_best_path_as_per_bgp_attribute(
2496 tgen
, addr_type
, router
, input_dict
, attribute
, expected
=True
2499 API is to verify best path according to BGP attributes for given routes.
2500 "show bgp ipv4/6 json" command will be run and verify best path according
2501 to shortest as-path, highest local-preference and med, lowest weight and
2502 route origin IGP>EGP>INCOMPLETE.
2505 * `tgen` : topogen object
2506 * `addr_type` : ip type, ipv4/ipv6
2507 * `tgen` : topogen object
2508 * `attribute` : calculate best path using this attribute
2509 * `input_dict`: defines different routes to calculate for which route
2510 best path is selected
2511 * `expected` : expected results from API, by-default True
2515 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
2516 router r7 to router r1(DUT) as per shortest as-path attribute
2523 "advertise_networks": [
2525 "network": "200.50.2.0/32"
2528 "network": "200.60.2.0/32"
2537 attribute = "locPrf"
2538 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
2539 input_dict, attribute)
2542 errormsg(str) or True
2545 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2547 if router
not in tgen
.routers():
2550 rnode
= tgen
.routers()[router
]
2552 # Verifying show bgp json
2553 command
= "show bgp"
2556 logger
.info("Verifying router %s RIB for best path:", router
)
2558 static_route
= False
2559 advertise_network
= False
2560 for route_val
in input_dict
.values():
2561 if "static_routes" in route_val
:
2563 networks
= route_val
["static_routes"]
2565 advertise_network
= True
2566 net_data
= route_val
["bgp"]["address_family"][addr_type
]["unicast"]
2567 networks
= net_data
["advertise_networks"]
2569 for network
in networks
:
2570 _network
= network
["network"]
2571 no_of_ip
= network
.setdefault("no_of_ip", 1)
2572 vrf
= network
.setdefault("vrf", None)
2575 cmd
= "{} vrf {}".format(command
, vrf
)
2579 cmd
= "{} {}".format(cmd
, addr_type
)
2580 cmd
= "{} json".format(cmd
)
2581 sh_ip_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2583 routes
= generate_ips(_network
, no_of_ip
)
2584 for route
in routes
:
2585 route
= str(ipaddress
.ip_network(frr_unicode(route
)))
2587 if route
in sh_ip_bgp_json
["routes"]:
2588 route_attributes
= sh_ip_bgp_json
["routes"][route
]
2592 for route_attribute
in route_attributes
:
2593 next_hops
= route_attribute
["nexthops"]
2594 for next_hop
in next_hops
:
2595 next_hop_ip
= next_hop
["ip"]
2596 attribute_dict
[next_hop_ip
] = route_attribute
[attribute
]
2599 if attribute
== "path":
2600 # Find next_hop for the route have minimum as_path
2602 attribute_dict
, key
=lambda x
: len(set(attribute_dict
[x
]))
2604 compare
= "SHORTEST"
2606 # LOCAL_PREF attribute
2607 elif attribute
== "locPrf":
2608 # Find next_hop for the route have highest local preference
2610 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2615 elif attribute
== "weight":
2616 # Find next_hop for the route have highest weight
2618 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2623 elif attribute
== "origin":
2624 # Find next_hop for the route have IGP as origin, -
2625 # - rule is IGP>EGP>INCOMPLETE
2628 for (key
, value
) in attribute_dict
.items()
2634 elif attribute
== "metric":
2635 # Find next_hop for the route have LOWEST MED
2637 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2642 if addr_type
== "ipv4":
2643 command_1
= "show ip route"
2645 command_1
= "show ipv6 route"
2648 cmd
= "{} vrf {} json".format(command_1
, vrf
)
2650 cmd
= "{} json".format(command_1
)
2652 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2654 # Verifying output dictionary rib_routes_json is not empty
2655 if not bool(rib_routes_json
):
2656 errormsg
= "No route found in RIB of router {}..".format(router
)
2661 # Find best is installed in RIB
2662 if route
in rib_routes_json
:
2664 # Verify next_hop in rib_routes_json
2666 rib_routes_json
[route
][0]["nexthops"][0]["ip"]
2672 "Incorrect Nexthop for BGP route {} in "
2673 "RIB of router {}, Expected: {}, Found:"
2677 rib_routes_json
[route
][0]["nexthops"][0]["ip"],
2683 if st_found
and nh_found
:
2685 "Best path for prefix: %s with next_hop: %s is "
2686 "installed according to %s %s: (%s) in RIB of "
2692 attribute_dict
[_next_hop
],
2696 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2700 @retry(retry_timeout
=10)
2701 def verify_best_path_as_per_admin_distance(
2702 tgen
, addr_type
, router
, input_dict
, attribute
, expected
=True, vrf
=None
2705 API is to verify best path according to admin distance for given
2706 route. "show ip/ipv6 route json" command will be run and verify
2707 best path accoring to shortest admin distanc.
2711 * `addr_type` : ip type, ipv4/ipv6
2712 * `dut`: Device Under Test
2713 * `tgen` : topogen object
2714 * `attribute` : calculate best path using admin distance
2715 * `input_dict`: defines different routes with different admin distance
2716 to calculate for which route best path is selected
2717 * `expected` : expected results from API, by-default True
2718 * `vrf`: Pass vrf name check for perticular vrf.
2722 # To verify best path for route 200.50.2.0/32 from router r2 to
2723 router r1(DUT) as per shortest admin distance which is 60.
2726 "static_routes": [{"network": "200.50.2.0/32", \
2727 "admin_distance": 80, "next_hop": "10.0.0.14"},
2728 {"network": "200.50.2.0/32", \
2729 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2731 attribute = "locPrf"
2732 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2733 input_dict, attribute):
2736 errormsg(str) or True
2739 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2740 router_list
= tgen
.routers()
2741 if router
not in router_list
:
2744 rnode
= tgen
.routers()[router
]
2747 logger
.info("Verifying router %s RIB for best path:", router
)
2750 if addr_type
== "ipv4":
2751 command
= "show ip route"
2753 command
= "show ipv6 route"
2756 command
= "{} vrf {} json".format(command
, vrf
)
2758 command
= "{} json".format(command
)
2760 for routes_from_router
in input_dict
.keys():
2761 sh_ip_route_json
= router_list
[routes_from_router
].vtysh_cmd(
2762 command
, isjson
=True
2764 networks
= input_dict
[routes_from_router
]["static_routes"]
2765 for network
in networks
:
2766 route
= network
["network"]
2768 route_attributes
= sh_ip_route_json
[route
]
2772 for route_attribute
in route_attributes
:
2773 next_hops
= route_attribute
["nexthops"]
2774 for next_hop
in next_hops
:
2775 next_hop_ip
= next_hop
["ip"]
2776 attribute_dict
[next_hop_ip
] = route_attribute
["distance"]
2778 # Find next_hop for the route have LOWEST Admin Distance
2779 _next_hop
= min(attribute_dict
, key
=(lambda k
: attribute_dict
[k
]))
2783 rib_routes_json
= run_frr_cmd(rnode
, command
, isjson
=True)
2785 # Verifying output dictionary rib_routes_json is not empty
2786 if not bool(rib_routes_json
):
2787 errormsg
= "No route found in RIB of router {}..".format(router
)
2792 # Find best is installed in RIB
2793 if route
in rib_routes_json
:
2795 # Verify next_hop in rib_routes_json
2798 for nh
in rib_routes_json
[route
][0]["nexthops"]
2799 if nh
["ip"] == _next_hop
2804 "Nexthop {} is Missing for BGP route {}"
2805 " in RIB of router {}\n".format(_next_hop
, route
, router
)
2809 if st_found
and nh_found
:
2811 "Best path for prefix: %s is installed according"
2812 " to %s %s: (%s) in RIB of router %s",
2816 attribute_dict
[_next_hop
],
2820 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2824 @retry(retry_timeout
=30)
2836 This API is to verify whether bgp rib has any
2837 matching route for a nexthop.
2841 * `tgen`: topogen object
2842 * `dut`: input dut router name
2843 * `addr_type` : ip type ipv4/ipv6
2844 * `input_dict` : input dict, has details of static routes
2845 * `next_hop`[optional]: next_hop which needs to be verified,
2847 * 'aspath'[optional]: aspath which needs to be verified
2848 * `expected` : expected results from API, by-default True
2853 next_hop = "192.168.1.10"
2854 input_dict = topo['routers']
2855 aspath = "100 200 300"
2856 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2861 errormsg(str) or True
2864 logger
.debug("Entering lib API: verify_bgp_rib()")
2866 router_list
= tgen
.routers()
2867 additional_nexthops_in_required_nhs
= []
2871 for routerInput
in input_dict
.keys():
2872 for router
, rnode
in router_list
.items():
2876 # Verifying RIB routes
2877 command
= "show bgp"
2881 logger
.info("Checking router {} BGP RIB:".format(dut
))
2883 if "static_routes" in input_dict
[routerInput
]:
2884 static_routes
= input_dict
[routerInput
]["static_routes"]
2886 for static_route
in static_routes
:
2892 vrf
= static_route
.setdefault("vrf", None)
2893 community
= static_route
.setdefault("community", None)
2894 largeCommunity
= static_route
.setdefault("largeCommunity", None)
2897 cmd
= "{} vrf {} {}".format(command
, vrf
, addr_type
)
2900 cmd
= "{} community {}".format(cmd
, community
)
2903 cmd
= "{} large-community {}".format(cmd
, largeCommunity
)
2905 cmd
= "{} {}".format(command
, addr_type
)
2907 cmd
= "{} json".format(cmd
)
2909 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2911 # Verifying output dictionary rib_routes_json is not empty
2912 if bool(rib_routes_json
) == False:
2913 errormsg
= "No route found in rib of router {}..".format(router
)
2916 network
= static_route
["network"]
2918 if "no_of_ip" in static_route
:
2919 no_of_ip
= static_route
["no_of_ip"]
2923 # Generating IPs for verification
2924 ip_list
= generate_ips(network
, no_of_ip
)
2926 for st_rt
in ip_list
:
2927 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
2929 _addr_type
= validate_ip_address(st_rt
)
2930 if _addr_type
!= addr_type
:
2933 if st_rt
in rib_routes_json
["routes"]:
2935 found_routes
.append(st_rt
)
2937 if next_hop
and multi_nh
and st_found
:
2938 if type(next_hop
) is not list:
2939 next_hop
= [next_hop
]
2943 0, len(rib_routes_json
["routes"][st_rt
])
2948 for rib_r
in rib_routes_json
["routes"][
2953 for mnh
in found_hops
:
2954 for each_nh_in_multipath
in mnh
:
2955 list2
.append(each_nh_in_multipath
)
2957 missing_list_of_nexthops
= set(list2
).difference(
2960 additional_nexthops_in_required_nhs
= set(
2965 if additional_nexthops_in_required_nhs
:
2967 "Missing nexthop %s for route"
2968 " %s in RIB of router %s\n",
2969 additional_nexthops_in_required_nhs
,
2976 elif next_hop
and multi_nh
is None:
2977 if type(next_hop
) is not list:
2978 next_hop
= [next_hop
]
2982 for rib_r
in rib_routes_json
["routes"][st_rt
][0][
2987 missing_list_of_nexthops
= set(list2
).difference(list1
)
2988 additional_nexthops_in_required_nhs
= set(
2993 if additional_nexthops_in_required_nhs
:
2995 "Missing nexthop %s for route"
2996 " %s in RIB of router %s\n",
2997 additional_nexthops_in_required_nhs
,
3002 "Nexthop {} is Missing for "
3003 "route {} in RIB of router {}\n".format(
3004 additional_nexthops_in_required_nhs
,
3014 found_paths
= rib_routes_json
["routes"][st_rt
][0][
3017 if aspath
== found_paths
:
3020 "Found AS path {} for route"
3021 " {} in RIB of router "
3022 "{}\n".format(aspath
, st_rt
, dut
)
3026 "AS Path {} is missing for route"
3027 "for route {} in RIB of router {}\n".format(
3034 missing_routes
.append(st_rt
)
3038 "Found next_hop {} for all bgp"
3040 " router {}\n".format(next_hop
, router
)
3043 if len(missing_routes
) > 0:
3045 "Missing route in RIB of router {}, "
3046 "routes: {}\n".format(dut
, missing_routes
)
3052 "Verified routes in router {} BGP RIB, "
3053 "found routes are: {} \n".format(dut
, found_routes
)
3057 if "bgp" not in input_dict
[routerInput
]:
3060 # Advertise networks
3061 bgp_data_list
= input_dict
[routerInput
]["bgp"]
3063 if type(bgp_data_list
) is not list:
3064 bgp_data_list
= [bgp_data_list
]
3066 for bgp_data
in bgp_data_list
:
3067 vrf_id
= bgp_data
.setdefault("vrf", None)
3069 cmd
= "{} vrf {} {}".format(command
, vrf_id
, addr_type
)
3071 cmd
= "{} {}".format(command
, addr_type
)
3073 cmd
= "{} json".format(cmd
)
3075 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3077 # Verifying output dictionary rib_routes_json is not empty
3078 if bool(rib_routes_json
) == False:
3079 errormsg
= "No route found in rib of router {}..".format(router
)
3082 bgp_net_advertise
= bgp_data
["address_family"][addr_type
]["unicast"]
3083 advertise_network
= bgp_net_advertise
.setdefault(
3084 "advertise_networks", []
3087 for advertise_network_dict
in advertise_network
:
3092 network
= advertise_network_dict
["network"]
3094 if "no_of_network" in advertise_network_dict
:
3095 no_of_network
= advertise_network_dict
["no_of_network"]
3099 # Generating IPs for verification
3100 ip_list
= generate_ips(network
, no_of_network
)
3102 for st_rt
in ip_list
:
3103 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
3105 _addr_type
= validate_ip_address(st_rt
)
3106 if _addr_type
!= addr_type
:
3109 if st_rt
in rib_routes_json
["routes"]:
3111 found_routes
.append(st_rt
)
3114 missing_routes
.append(st_rt
)
3116 if len(missing_routes
) > 0:
3118 "Missing route in BGP RIB of router {},"
3119 " are: {}\n".format(dut
, missing_routes
)
3125 "Verified routes in router {} BGP RIB, found "
3126 "routes are: {}\n".format(dut
, found_routes
)
3129 logger
.debug("Exiting lib API: verify_bgp_rib()")
3133 @retry(retry_timeout
=10)
3134 def verify_graceful_restart(
3135 tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True
3138 This API is to verify verify_graceful_restart configuration of DUT and
3139 cross verify the same from the peer bgp routerrouter.
3143 * `tgen`: topogen object
3144 * `topo`: input json file data
3145 * `addr_type` : ip type ipv4/ipv6
3146 * `input_dict`: input dictionary, have details of Device Under Test, for
3147 which user wants to test the data
3148 * `dut`: input dut router name
3149 * `peer`: input peer router name
3150 * `expected` : expected results from API, by-default True
3163 "graceful-restart": True
3176 "graceful-restart": True
3188 result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
3189 dut = "r1", peer = 'r2')
3192 errormsg(str) or True
3195 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3197 for router
, rnode
in tgen
.routers().items():
3202 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3204 bgp_addr_type
= topo
["routers"][dut
]["bgp"][0]["address_family"]
3206 # bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3208 if addr_type
in bgp_addr_type
:
3209 if not check_address_types(addr_type
):
3212 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3214 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3215 if bgp_neighbor
!= peer
:
3218 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3219 data
= topo
["routers"][bgp_neighbor
]["links"]
3221 if dest_link
in data
:
3222 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3225 "[DUT: {}]: Checking bgp graceful-restart show"
3226 " o/p {}".format(dut
, neighbor_ip
)
3229 show_bgp_graceful_json
= None
3231 show_bgp_graceful_json
= run_frr_cmd(
3233 "show bgp {} neighbor {} graceful-restart json".format(
3234 addr_type
, neighbor_ip
3239 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3241 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3243 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3246 errormsg
= "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3254 if "address_family" in input_dict
[dut
]["bgp"]:
3255 bgp_neighbors
= input_dict
[dut
]["bgp"]["address_family"][addr_type
][
3257 ]["neighbor"][peer
]["dest_link"]
3259 for dest_link
, data
in bgp_neighbors
.items():
3261 "graceful-restart-helper" in data
3262 and data
["graceful-restart-helper"]
3265 elif "graceful-restart" in data
and data
["graceful-restart"]:
3268 "graceful-restart-disable" in data
3269 and data
["graceful-restart-disable"]
3276 if "graceful-restart" in input_dict
[dut
]["bgp"]:
3278 "graceful-restart" in input_dict
[dut
]["bgp"]["graceful-restart"]
3279 and input_dict
[dut
]["bgp"]["graceful-restart"][
3285 "graceful-restart-disable"
3286 in input_dict
[dut
]["bgp"]["graceful-restart"]
3287 and input_dict
[dut
]["bgp"]["graceful-restart"][
3288 "graceful-restart-disable"
3297 if lmode
== "Disable" or lmode
== "Disable*":
3301 if "address_family" in input_dict
[peer
]["bgp"]:
3302 bgp_neighbors
= input_dict
[peer
]["bgp"]["address_family"][addr_type
][
3304 ]["neighbor"][dut
]["dest_link"]
3306 for dest_link
, data
in bgp_neighbors
.items():
3308 "graceful-restart-helper" in data
3309 and data
["graceful-restart-helper"]
3312 elif "graceful-restart" in data
and data
["graceful-restart"]:
3315 "graceful-restart-disable" in data
3316 and data
["graceful-restart-disable"]
3323 if "graceful-restart" in input_dict
[peer
]["bgp"]:
3326 in input_dict
[peer
]["bgp"]["graceful-restart"]
3327 and input_dict
[peer
]["bgp"]["graceful-restart"][
3333 "graceful-restart-disable"
3334 in input_dict
[peer
]["bgp"]["graceful-restart"]
3335 and input_dict
[peer
]["bgp"]["graceful-restart"][
3336 "graceful-restart-disable"
3345 if show_bgp_graceful_json_out
["localGrMode"] == lmode
:
3347 "[DUT: {}]: localGrMode : {} ".format(
3348 dut
, show_bgp_graceful_json_out
["localGrMode"]
3353 "[DUT: {}]: localGrMode is not correct"
3354 " Expected: {}, Found: {}".format(
3355 dut
, lmode
, show_bgp_graceful_json_out
["localGrMode"]
3360 if show_bgp_graceful_json_out
["remoteGrMode"] == rmode
:
3362 "[DUT: {}]: remoteGrMode : {} ".format(
3363 dut
, show_bgp_graceful_json_out
["remoteGrMode"]
3367 show_bgp_graceful_json_out
["remoteGrMode"] == "NotApplicable"
3368 and rmode
== "Disable"
3371 "[DUT: {}]: remoteGrMode : {} ".format(
3372 dut
, show_bgp_graceful_json_out
["remoteGrMode"]
3377 "[DUT: {}]: remoteGrMode is not correct"
3378 " Expected: {}, Found: {}".format(
3379 dut
, rmode
, show_bgp_graceful_json_out
["remoteGrMode"]
3384 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3388 @retry(retry_timeout
=10)
3389 def verify_r_bit(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3391 This API is to verify r_bit in the BGP gr capability advertised
3392 by the neighbor router
3396 * `tgen`: topogen object
3397 * `topo`: input json file data
3398 * `addr_type` : ip type ipv4/ipv6
3399 * `input_dict`: input dictionary, have details of Device Under Test, for
3400 which user wants to test the data
3401 * `dut`: input dut router name
3403 * `expected` : expected results from API, by-default True
3417 "graceful-restart": True
3430 "graceful-restart": True
3441 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
3445 errormsg(str) or True
3448 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3450 for router
, rnode
in tgen
.routers().items():
3454 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3456 if addr_type
in bgp_addr_type
:
3457 if not check_address_types(addr_type
):
3460 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3462 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3463 if bgp_neighbor
!= peer
:
3466 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3467 data
= topo
["routers"][bgp_neighbor
]["links"]
3469 if dest_link
in data
:
3470 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3473 "[DUT: {}]: Checking bgp graceful-restart show"
3474 " o/p {}".format(dut
, neighbor_ip
)
3477 show_bgp_graceful_json
= run_frr_cmd(
3479 "show bgp {} neighbor {} graceful-restart json".format(
3480 addr_type
, neighbor_ip
3485 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3487 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3489 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3492 errormsg
= "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3497 if "rBit" in show_bgp_graceful_json_out
:
3498 if show_bgp_graceful_json_out
["rBit"]:
3499 logger
.info("[DUT: {}]: Rbit true {}".format(dut
, neighbor_ip
))
3501 errormsg
= "[DUT: {}]: Rbit false {}".format(dut
, neighbor_ip
)
3504 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3508 @retry(retry_timeout
=10)
3509 def verify_eor(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3511 This API is to verify EOR
3515 * `tgen`: topogen object
3516 * `topo`: input json file data
3517 * `addr_type` : ip type ipv4/ipv6
3518 * `input_dict`: input dictionary, have details of DUT, for
3519 which user wants to test the data
3520 * `dut`: input dut router name
3535 "graceful-restart": True
3548 "graceful-restart": True
3560 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
3564 errormsg(str) or True
3566 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3568 for router
, rnode
in tgen
.routers().items():
3572 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3574 if addr_type
in bgp_addr_type
:
3575 if not check_address_types(addr_type
):
3578 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3580 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3581 if bgp_neighbor
!= peer
:
3584 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3585 data
= topo
["routers"][bgp_neighbor
]["links"]
3587 if dest_link
in data
:
3588 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3591 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
3596 show_bgp_graceful_json
= run_frr_cmd(
3598 "show bgp {} neighbor {} graceful-restart json".format(
3599 addr_type
, neighbor_ip
3604 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3606 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3607 logger
.info("[DUT: %s]: Neighbor ip matched %s", dut
, neighbor_ip
)
3609 errormsg
= "[DUT: %s]: Neighbor ip is NOT matched %s" % (
3615 if addr_type
== "ipv4":
3617 elif addr_type
== "ipv6":
3620 errormsg
= "Address type %s is not supported" % (addr_type
)
3623 eor_json
= show_bgp_graceful_json_out
[afi
]["endOfRibStatus"]
3624 if "endOfRibSend" in eor_json
:
3625 if eor_json
["endOfRibSend"]:
3627 "[DUT: %s]: EOR Send true for %s " "%s", dut
, neighbor_ip
, afi
3630 errormsg
= "[DUT: %s]: EOR Send false for %s" " %s" % (
3637 if "endOfRibRecv" in eor_json
:
3638 if eor_json
["endOfRibRecv"]:
3640 "[DUT: %s]: EOR Recv true %s " "%s", dut
, neighbor_ip
, afi
3643 errormsg
= "[DUT: %s]: EOR Recv false %s " "%s" % (
3650 if "endOfRibSentAfterUpdate" in eor_json
:
3651 if eor_json
["endOfRibSentAfterUpdate"]:
3653 "[DUT: %s]: EOR SendTime true for %s" " %s",
3659 errormsg
= "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3666 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3670 @retry(retry_timeout
=8)
3671 def verify_f_bit(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3673 This API is to verify f_bit in the BGP gr capability advertised
3674 by the neighbor router
3678 * `tgen`: topogen object
3679 * `topo`: input json file data
3680 * `addr_type` : ip type ipv4/ipv6
3681 * `input_dict`: input dictionary, have details of Device Under Test, for
3682 which user wants to test the data
3683 * `dut`: input dut router name
3685 * `expected` : expected results from API, by-default True
3699 "graceful-restart": True
3712 "graceful-restart": True
3724 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3728 errormsg(str) or True
3731 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3733 for router
, rnode
in tgen
.routers().items():
3737 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3739 if addr_type
in bgp_addr_type
:
3740 if not check_address_types(addr_type
):
3743 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3745 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3746 if bgp_neighbor
!= peer
:
3749 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3750 data
= topo
["routers"][bgp_neighbor
]["links"]
3752 if dest_link
in data
:
3753 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3756 "[DUT: {}]: Checking bgp graceful-restart show"
3757 " o/p {}".format(dut
, neighbor_ip
)
3760 show_bgp_graceful_json
= run_frr_cmd(
3762 "show bgp {} neighbor {} graceful-restart json".format(
3763 addr_type
, neighbor_ip
3768 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3770 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3772 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3775 errormsg
= "[DUT: {}]: Neighbor ip NOT a match {}".format(
3780 if "ipv4Unicast" in show_bgp_graceful_json_out
:
3781 if show_bgp_graceful_json_out
["ipv4Unicast"]["fBit"]:
3783 "[DUT: {}]: Fbit True for {} IPv4"
3784 " Unicast".format(dut
, neighbor_ip
)
3787 errormsg
= "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3792 elif "ipv6Unicast" in show_bgp_graceful_json_out
:
3793 if show_bgp_graceful_json_out
["ipv6Unicast"]["fBit"]:
3795 "[DUT: {}]: Fbit True for {} IPv6"
3796 " Unicast".format(dut
, neighbor_ip
)
3799 errormsg
= "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3804 show_bgp_graceful_json_out
["ipv4Unicast"]
3805 show_bgp_graceful_json_out
["ipv6Unicast"]
3807 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3811 @retry(retry_timeout
=10)
3812 def verify_graceful_restart_timers(tgen
, topo
, addr_type
, input_dict
, dut
, peer
):
3814 This API is to verify graceful restart timers, configured and received
3818 * `tgen`: topogen object
3819 * `topo`: input json file data
3820 * `addr_type` : ip type ipv4/ipv6
3821 * `input_dict`: input dictionary, have details of Device Under Test,
3822 for which user wants to test the data
3823 * `dut`: input dut router name
3825 * `expected` : expected results from API, by-default True
3829 # Configure graceful-restart
3835 "graceful-restart": "graceful-restart-helper"
3838 "gracefulrestart": ["restart-time 150"]
3845 "graceful-restart": "graceful-restart"
3852 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3856 errormsg(str) or True
3859 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3861 for router
, rnode
in tgen
.routers().items():
3865 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3867 if addr_type
in bgp_addr_type
:
3868 if not check_address_types(addr_type
):
3871 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3873 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3874 if bgp_neighbor
!= peer
:
3877 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3878 data
= topo
["routers"][bgp_neighbor
]["links"]
3880 if dest_link
in data
:
3881 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3884 "[DUT: {}]: Checking bgp graceful-restart show"
3885 " o/p {}".format(dut
, neighbor_ip
)
3888 show_bgp_graceful_json
= run_frr_cmd(
3890 "show bgp {} neighbor {} graceful-restart json".format(
3891 addr_type
, neighbor_ip
3896 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3897 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3899 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3902 errormsg
= "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3907 # Graceful-restart timer
3908 if "graceful-restart" in input_dict
[peer
]["bgp"]:
3909 if "timer" in input_dict
[peer
]["bgp"]["graceful-restart"]:
3910 for rs_timer
, value
in input_dict
[peer
]["bgp"]["graceful-restart"][
3913 if rs_timer
== "restart-time":
3914 receivedTimer
= value
3916 show_bgp_graceful_json_out
["timers"][
3917 "receivedRestartTimer"
3922 "receivedRestartTimer is {}"
3923 " on {} from peer {}".format(
3924 receivedTimer
, router
, peer
3929 "receivedRestartTimer is not"
3930 " as expected {}".format(receivedTimer
)
3934 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3938 @retry(retry_timeout
=8)
3939 def verify_gr_address_family(
3940 tgen
, topo
, addr_type
, addr_family
, dut
, peer
, expected
=True
3943 This API is to verify gr_address_family in the BGP gr capability advertised
3944 by the neighbor router
3948 * `tgen`: topogen object
3949 * `topo`: input json file data
3950 * `addr_type` : ip type ipv4/ipv6
3951 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3952 * `dut`: input dut router name
3953 * `peer`: input peer router to check
3954 * `expected` : expected results from API, by-default True
3959 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1", "r3")
3963 errormsg(str) or None
3966 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3968 if not check_address_types(addr_type
):
3969 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3972 routers
= tgen
.routers()
3973 if dut
not in routers
:
3974 return "{} not in routers".format(dut
)
3976 rnode
= routers
[dut
]
3977 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3979 if addr_type
not in bgp_addr_type
:
3980 return "{} not in bgp_addr_types".format(addr_type
)
3982 if peer
not in bgp_addr_type
[addr_type
]["unicast"]["neighbor"]:
3983 return "{} not a peer of {} over {}".format(peer
, dut
, addr_type
)
3985 nbr_links
= topo
["routers"][peer
]["links"]
3986 if dut
not in nbr_links
or addr_type
not in nbr_links
[dut
]:
3987 return "peer {} missing back link to {} over {}".format(peer
, dut
, addr_type
)
3989 neighbor_ip
= nbr_links
[dut
][addr_type
].split("/")[0]
3992 "[DUT: {}]: Checking bgp graceful-restart show o/p {} for {}".format(
3993 dut
, neighbor_ip
, addr_family
3997 show_bgp_graceful_json
= run_frr_cmd(
3999 "show bgp {} neighbor {} graceful-restart json".format(addr_type
, neighbor_ip
),
4003 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
4005 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
4006 logger
.info("Neighbor ip matched {}".format(neighbor_ip
))
4008 errormsg
= "Neighbor ip NOT a match {}".format(neighbor_ip
)
4011 if addr_family
== "ipv4Unicast":
4012 if "ipv4Unicast" in show_bgp_graceful_json_out
:
4013 logger
.info("ipv4Unicast present for {} ".format(neighbor_ip
))
4016 errormsg
= "ipv4Unicast NOT present for {} ".format(neighbor_ip
)
4019 elif addr_family
== "ipv6Unicast":
4020 if "ipv6Unicast" in show_bgp_graceful_json_out
:
4021 logger
.info("ipv6Unicast present for {} ".format(neighbor_ip
))
4024 errormsg
= "ipv6Unicast NOT present for {} ".format(neighbor_ip
)
4027 errormsg
= "Aaddress family: {} present for {} ".format(
4028 addr_family
, neighbor_ip
4032 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4035 @retry(retry_timeout
=12)
4036 def verify_attributes_for_evpn_routes(
4050 API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1"
4054 * `tgen`: topogen object
4055 * `topo` : json file data
4056 * `dut` : device under test
4057 * `input_dict`: having details like - for which route, rd value
4058 needs to be verified
4059 * `rd` : route distinguisher
4060 * `rt` : route target
4061 * `ethTag` : Ethernet Tag
4062 * `ipLen` : IP prefix length
4063 * `rd_peer` : Peer name from which RD will be auto-generated
4064 * `rt_peer` : Peer name from which RT will be auto-generated
4065 * `expected` : expected results from API, by-default True
4072 "network": [NETWORK1_1[addr_type]],
4073 "next_hop": NEXT_HOP_IP[addr_type],
4079 result = verify_attributes_for_evpn_routes(tgen, topo,
4080 input_dict, rd = "10.0.0.33:1")
4084 errormsg(str) or True
4087 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4088 for router
in input_dict
.keys():
4089 rnode
= tgen
.routers()[dut
]
4091 if "static_routes" in input_dict
[router
]:
4092 for static_route
in input_dict
[router
]["static_routes"]:
4093 network
= static_route
["network"]
4095 if "vrf" in static_route
:
4096 vrf
= static_route
["vrf"]
4098 if type(network
) is not list:
4101 for route
in network
:
4102 route
= route
.split("/")[0]
4103 _addr_type
= validate_ip_address(route
)
4104 if "v4" in _addr_type
:
4106 elif "v6" in _addr_type
:
4109 cmd
= "show bgp l2vpn evpn {} json".format(route
)
4110 evpn_rd_value_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4111 if not bool(evpn_rd_value_json
):
4112 errormsg
= "No output for '{}' cli".format(cmd
)
4115 if rd
is not None and rd
!= "auto":
4117 "[DUT: %s]: Verifying rd value for " "evpn route %s:",
4122 if rd
in evpn_rd_value_json
:
4123 rd_value_json
= evpn_rd_value_json
[rd
]
4124 if rd_value_json
["rd"] != rd
:
4126 "[DUT: %s] Failed: Verifying"
4127 " RD value for EVPN route: %s"
4128 "[FAILED]!!, EXPECTED : %s "
4130 % (dut
, route
, rd
, rd_value_json
["rd"])
4136 "[DUT %s]: Verifying RD value for"
4137 " EVPN route: %s [PASSED]|| "
4138 "Found Expected: %s",
4147 "[DUT: %s] RD : %s is not present"
4148 " in cli json output" % (dut
, rd
)
4154 "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:",
4163 rnode
= tgen
.routers()[rd_peer
]
4164 vrfs
= topo
["routers"][rd_peer
]["vrfs"]
4165 for vrf_dict
in vrfs
:
4166 vni_dict
[vrf_dict
["name"]] = index
4169 show_bgp_json
= run_frr_cmd(
4170 rnode
, "show bgp vrf all summary json", isjson
=True
4173 # Verifying output dictionary show_bgp_json is empty
4174 if not bool(show_bgp_json
):
4175 errormsg
= "BGP is not running"
4178 show_bgp_json_vrf
= show_bgp_json
[vrf
]
4179 for afi
, afi_data
in show_bgp_json_vrf
.items():
4180 if input_afi
not in afi
:
4182 router_id
= afi_data
["routerId"]
4185 rd
= "{}:{}".format(router_id
, vni_dict
[vrf
])
4186 for _rd
, rd_value_json
in evpn_rd_value_json
.items():
4188 str(rd_value_json
["rd"].split(":")[0])
4193 if int(rd_value_json
["rd"].split(":")[1]) > 0:
4198 "[DUT %s]: Verifying RD value for"
4200 "Found Expected: %s",
4203 rd_value_json
["rd"],
4208 "[DUT: %s] Failed: Verifying"
4209 " RD value for EVPN route: %s"
4210 " FOUND : %s" % (dut
, route
, rd_value_json
["rd"])
4217 "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
4223 rnode
= tgen
.routers()[rt_peer
]
4224 show_bgp_json
= run_frr_cmd(
4225 rnode
, "show bgp vrf all summary json", isjson
=True
4228 # Verifying output dictionary show_bgp_json is empty
4229 if not bool(show_bgp_json
):
4230 errormsg
= "BGP is not running"
4233 show_bgp_json_vrf
= show_bgp_json
[vrf
]
4234 for afi
, afi_data
in show_bgp_json_vrf
.items():
4235 if input_afi
not in afi
:
4237 as_num
= afi_data
["as"]
4239 show_vrf_vni_json
= run_frr_cmd(
4240 rnode
, "show vrf vni json", isjson
=True
4243 vrfs
= show_vrf_vni_json
["vrfs"]
4244 for vrf_dict
in vrfs
:
4245 if vrf_dict
["vrf"] == vrf
:
4246 vni_dict
[vrf_dict
["vrf"]] = str(vrf_dict
["vni"])
4248 # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI
4249 # for auto derived RT value.
4251 as_bin
= bin(as_num
)
4252 as_bin
= as_bin
[-16:]
4253 as_num
= int(as_bin
, 2)
4255 rt
= "{}:{}".format(str(as_num
), vni_dict
[vrf
])
4256 for _rd
, route_data
in evpn_rd_value_json
.items():
4257 if route_data
["ip"] == route
:
4258 for rt_data
in route_data
["paths"]:
4259 if vni_dict
[vrf
] == rt_data
["vni"]:
4260 rt_string
= rt_data
["extendedCommunity"][
4263 rt_input
= "RT:{}".format(rt
)
4264 if rt_input
not in rt_string
:
4273 % (dut
, route
, rt_input
, rt_string
)
4279 "[DUT %s]: Verifying "
4280 "RT value for EVPN "
4281 "route: %s [PASSED]||"
4282 "Found Expected: %s",
4291 "[DUT: %s] Route : %s is not"
4292 " present in cli json output" % (dut
, route
)
4296 if rt
is not None and rt
!= "auto":
4298 "[DUT: %s]: Verifying rt value for " "evpn route %s:",
4303 if type(rt
) is not list:
4307 for _rd
, route_data
in evpn_rd_value_json
.items():
4308 if route_data
["ip"] == route
:
4309 for rt_data
in route_data
["paths"]:
4310 rt_string
= rt_data
["extendedCommunity"][
4313 rt_input
= "RT:{}".format(_rt
)
4314 if rt_input
not in rt_string
:
4316 "[DUT: %s] Failed: "
4317 "Verifying RT value "
4318 "for EVPN route: %s"
4322 % (dut
, route
, rt_input
, rt_string
)
4328 "[DUT %s]: Verifying RT"
4329 " value for EVPN route:"
4331 "Found Expected: %s",
4340 "[DUT: %s] Route : %s is not"
4341 " present in cli json output" % (dut
, route
)
4345 if ethTag
is not None:
4347 "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut
4350 for _rd
, route_data
in evpn_rd_value_json
.items():
4351 if route_data
["ip"] == route
:
4352 if route_data
["ethTag"] != ethTag
:
4354 "[DUT: %s] RD: %s, Failed: "
4355 "Verifying ethTag value "
4356 "for EVPN route: %s"
4365 route_data
["ethTag"],
4372 "[DUT %s]: RD: %s, Verifying "
4373 "ethTag value for EVPN route:"
4375 "Found Expected: %s",
4385 "[DUT: %s] RD: %s, Route : %s "
4386 "is not present in cli json "
4387 "output" % (dut
, _rd
, route
)
4391 if ipLen
is not None:
4393 "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut
4396 for _rd
, route_data
in evpn_rd_value_json
.items():
4397 if route_data
["ip"] == route
:
4398 if route_data
["ipLen"] != int(ipLen
):
4400 "[DUT: %s] RD: %s, Failed: "
4401 "Verifying ipLen value "
4402 "for EVPN route: %s"
4406 % (dut
, _rd
, route
, ipLen
, route_data
["ipLen"])
4412 "[DUT %s]: RD: %s, Verifying "
4413 "ipLen value for EVPN route:"
4415 "Found Expected: %s",
4425 "[DUT: %s] RD: %s, Route : %s "
4426 "is not present in cli json "
4427 "output " % (dut
, _rd
, route
)
4431 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4435 @retry(retry_timeout
=10)
4436 def verify_evpn_routes(
4437 tgen
, topo
, dut
, input_dict
, routeType
=5, EthTag
=0, next_hop
=None, expected
=True
4440 API to verify evpn routes using "sh bgp l2vpn evpn"
4444 * `tgen`: topogen object
4445 * `topo` : json file data
4446 * `dut` : device under test
4447 * `input_dict`: having details like - for which route, rd value
4448 needs to be verified
4449 * `route_type` : Route type 5 is supported as of now
4450 * `EthTag` : Ethernet tag, by-default is 0
4451 * `next_hop` : Prefered nexthop for the evpn routes
4452 * `expected` : expected results from API, by-default True
4459 "network": [NETWORK1_1[addr_type]],
4460 "next_hop": NEXT_HOP_IP[addr_type],
4465 result = verify_evpn_routes(tgen, topo, input_dict)
4468 errormsg(str) or True
4471 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4473 for router
in input_dict
.keys():
4474 rnode
= tgen
.routers()[dut
]
4476 logger
.info("[DUT: %s]: Verifying evpn routes: ", dut
)
4478 if "static_routes" in input_dict
[router
]:
4479 for static_route
in input_dict
[router
]["static_routes"]:
4480 network
= static_route
["network"]
4482 if type(network
) is not list:
4486 for route
in network
:
4488 ip_len
= route
.split("/")[1]
4489 route
= route
.split("/")[0]
4491 prefix
= "[{}]:[{}]:[{}]:[{}]".format(
4492 routeType
, EthTag
, ip_len
, route
4495 cmd
= "show bgp l2vpn evpn route json"
4496 evpn_value_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4498 if not bool(evpn_value_json
):
4499 errormsg
= "No output for '{}' cli".format(cmd
)
4502 if evpn_value_json
["numPrefix"] == 0:
4503 errormsg
= "[DUT: %s]: No EVPN prefixes exist" % (dut
)
4506 for key
, route_data_json
in evpn_value_json
.items():
4507 if type(route_data_json
) is dict:
4509 if prefix
not in route_data_json
:
4510 missing_routes
[key
] = prefix
4512 if rd_keys
== len(missing_routes
.keys()):
4515 "Missing EVPN routes: "
4516 "%s [FAILED]!!" % (dut
, list(set(missing_routes
.values())))
4520 for key
, route_data_json
in evpn_value_json
.items():
4521 if type(route_data_json
) is dict:
4522 if prefix
not in route_data_json
:
4525 for paths
in route_data_json
[prefix
]["paths"]:
4527 if path
["routeType"] != routeType
:
4530 "Verifying routeType "
4531 "for EVPN route: %s "
4545 for nh_dict
in path
["nexthops"]:
4546 if nh_dict
["ip"] != next_hop
:
4566 "[DUT %s]: Verifying "
4577 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4582 @retry(retry_timeout
=10)
4583 def verify_bgp_bestpath(tgen
, addr_type
, input_dict
):
4585 Verifies bgp next hop values in best-path output
4587 * `dut` : device under test
4588 * `addr_type` : Address type ipv4/ipv6
4589 * `input_dict`: having details like multipath and bestpath
4596 "bestpath": "50.0.0.1",
4597 "multipath": ["50.0.0.1", "50.0.0.2"],
4598 "network": "100.0.0.0/24"
4601 "bestpath": "1000::1",
4602 "multipath": ["1000::1", "1000::2"]
4603 "network": "2000::1/128"
4608 result = verify_bgp_bestpath(tgen, input_dict)
4613 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4614 for dut
in input_dict
.keys():
4615 rnode
= tgen
.routers()[dut
]
4617 logger
.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut
)
4619 for network_dict
in input_dict
[dut
][addr_type
]:
4620 nw_addr
= network_dict
.setdefault("network", None)
4621 vrf
= network_dict
.setdefault("vrf", None)
4622 bestpath
= network_dict
.setdefault("bestpath", None)
4625 cmd
= "show bgp vrf {} {} {} bestpath json".format(
4626 vrf
, addr_type
, nw_addr
4629 cmd
= "show bgp {} {} bestpath json".format(addr_type
, nw_addr
)
4631 data
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4632 route
= data
["paths"][0]
4634 if "bestpath" in route
:
4635 if route
["bestpath"]["overall"] is True:
4636 _bestpath
= route
["nexthops"][0]["ip"]
4638 if _bestpath
!= bestpath
:
4640 "DUT:[{}] Bestpath do not match for"
4641 " network: {}, Expected "
4642 " {} as bgp bestpath found {}".format(
4643 dut
, nw_addr
, bestpath
, _bestpath
4648 "DUT:[{}] Found expected bestpath: "
4649 " {} for network: {}".format(dut
, _bestpath
, nw_addr
)
4653 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4657 def verify_tcp_mss(tgen
, dut
, neighbour
, configured_tcp_mss
, vrf
=None):
4659 This api is used to verify the tcp-mss value assigned to a neigbour of DUT
4663 * `tgen` : topogen object
4664 * `dut`: device under test
4665 * `neighbour`:neigbout IP address
4666 * `configured_tcp_mss`:The TCP-MSS value to be verified
4671 result = verify_tcp_mss(tgen, dut,neighbour,configured_tcp_mss)
4674 errormsg(str) or True
4677 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4678 rnode
= tgen
.routers()[dut
]
4680 cmd
= "show bgp vrf {} neighbors {} json".format(vrf
, neighbour
)
4682 cmd
= "show bgp neighbors {} json".format(neighbour
)
4684 # Execute the command
4685 show_vrf_stats
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4687 # Verify TCP-MSS on router
4688 logger
.info("Verify that no core is observed")
4689 if tgen
.routers_have_failure():
4690 errormsg
= "Core observed while running CLI: %s" % (cmd
)
4693 if configured_tcp_mss
== show_vrf_stats
.get(neighbour
).get(
4694 "bgpTcpMssConfigured"
4697 "Configured TCP - MSS Found: {}".format(sys
._getframe
().f_code
.co_name
)
4702 "TCP-MSS Mismatch ,configured {} expecting {}".format(
4703 show_vrf_stats
.get(neighbour
).get("bgpTcpMssConfigured"),
4707 return "TCP-MSS Mismatch"
4708 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4712 def get_dut_as_number(tgen
, dut
):
4714 API to get the Autonomous Number of the given DUT
4718 dut : Device Under test
4722 Success : DUT Autonomous number
4723 Fail : Error message with Boolean False
4725 tgen
= get_topogen()
4726 for router
, rnode
in tgen
.routers().items():
4728 show_bgp_json
= run_frr_cmd(rnode
, "sh ip bgp summary json ", isjson
=True)
4729 as_number
= show_bgp_json
["ipv4Unicast"]["as"]
4732 "[dut {}] DUT contains Automnomous number :: {} ".format(
4739 "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format(
4746 def get_prefix_count_route(
4747 tgen
, topo
, dut
, peer
, vrf
=None, link
=None, sent
=None, received
=None
4750 API to return the prefix count of default originate the given DUT
4751 dut : Device under test
4752 peer : neigbor on which you are expecting the route to be received
4755 prefix_count as dict with ipv4 and ipv6 value
4757 # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
4760 neighbor_ipv4_address
= topo
["routers"][peer
]["links"][link
]["ipv4"]
4761 neighbor_ipv6_address
= topo
["routers"][peer
]["links"][link
]["ipv6"]
4763 neighbor_ipv4_address
= topo
["routers"][peer
]["links"][dut
]["ipv4"]
4764 neighbor_ipv6_address
= topo
["routers"][peer
]["links"][dut
]["ipv6"]
4766 neighbor_ipv4_address
= neighbor_ipv4_address
.split("/")[0]
4767 neighbor_ipv6_address
= neighbor_ipv6_address
.split("/")[0]
4769 tgen
= get_topogen()
4770 for router
, rnode
in tgen
.routers().items():
4773 ipv4_cmd
= "sh ip bgp vrf {} summary json".format(vrf
)
4774 show_bgp_json_ipv4
= run_frr_cmd(rnode
, ipv4_cmd
, isjson
=True)
4775 ipv6_cmd
= "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf
)
4776 show_bgp_json_ipv6
= run_frr_cmd(rnode
, ipv6_cmd
, isjson
=True)
4778 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"]["peers"][
4779 neighbor_ipv4_address
4781 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4782 neighbor_ipv6_address
4786 "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4789 neighbor_ipv4_address
,
4790 neighbor_ipv6_address
,
4797 show_bgp_json_ipv4
= run_frr_cmd(
4798 rnode
, "sh ip bgp summary json ", isjson
=True
4800 show_bgp_json_ipv6
= run_frr_cmd(
4801 rnode
, "sh ip bgp ipv6 unicast summary json ", isjson
=True
4804 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4806 ][neighbor_ipv4_address
]["pfxRcd"]
4807 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4808 neighbor_ipv6_address
4812 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4814 ][neighbor_ipv4_address
]["pfxSnt"]
4815 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4816 neighbor_ipv6_address
4820 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4822 ][neighbor_ipv4_address
]["pfxRcd"]
4823 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4824 neighbor_ipv6_address
4828 "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4829 dut
, neighbor_ipv4_address
, neighbor_ipv6_address
, prefix_count
4834 logger
.error("ERROR...! Unknown dut {} in topolgy".format(dut
))
4837 @retry(retry_timeout
=5)
4838 def verify_rib_default_route(
4847 expected_aspath
=None,
4850 API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
4854 dut : device under test
4855 routes : default route with expected nexthop
4856 expected_nexthop : the nexthop that is expected the deafult route
4860 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4861 tgen
= get_topogen()
4862 connected_routes
= {}
4863 for router
, rnode
in tgen
.routers().items():
4865 ipv4_routes
= run_frr_cmd(rnode
, "sh ip bgp json", isjson
=True)
4866 ipv6_routes
= run_frr_cmd(rnode
, "sh ip bgp ipv6 unicast json", isjson
=True)
4867 is_ipv4_default_attrib_found
= False
4868 is_ipv6_default_attrib_found
= False
4870 default_ipv4_route
= routes
["ipv4"]
4871 default_ipv6_route
= "::/0"
4872 ipv4_route_Origin
= False
4873 ipv4_route_local_pref
= False
4874 ipv4_route_metric
= False
4876 if default_ipv4_route
in ipv4_routes
["routes"].keys():
4877 nxt_hop_count
= len(ipv4_routes
["routes"][default_ipv4_route
])
4879 for index
in range(nxt_hop_count
):
4880 rib_next_hops
.append(
4881 ipv4_routes
["routes"][default_ipv4_route
][index
]["nexthops"][0]["ip"]
4884 for nxt_hop
in expected_nexthop
.items():
4885 if nxt_hop
[0] == "ipv4":
4886 if nxt_hop
[1] in rib_next_hops
:
4888 "Default routes [{}] obtained from {} .....PASSED".format(
4889 default_ipv4_route
, nxt_hop
[1]
4894 "ERROR ...! Default routes [{}] expected is missing {}".format(
4895 default_ipv4_route
, nxt_hop
[1]
4903 if "origin" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4904 ipv4_route_Origin
= ipv4_routes
["routes"][default_ipv4_route
][0]["origin"]
4905 if "locPrf" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4906 ipv4_route_local_pref
= ipv4_routes
["routes"][default_ipv4_route
][0][
4909 if "metric" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4910 ipv4_route_metric
= ipv4_routes
["routes"][default_ipv4_route
][0]["metric"]
4912 logger
.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut
))
4915 origin_found
= False
4916 locPrf_found
= False
4917 metric_found
= False
4918 as_path_found
= False
4921 if origin
== ipv4_route_Origin
:
4923 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
4924 default_ipv4_route
, origin
4930 "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
4931 origin
, ipv4_route_Origin
4939 if locPrf
== ipv4_route_local_pref
:
4941 "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
4942 default_ipv4_route
, locPrf
4948 "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
4949 locPrf
, ipv4_route_local_pref
4957 if metric
== ipv4_route_metric
:
4959 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
4960 default_ipv4_route
, metric
4967 "ERROR... IPV4::! Expected metric is {} obtained {}".format(
4968 metric
, ipv4_route_metric
4976 obtained_aspath
= ipv4_routes
["routes"]["0.0.0.0/0"][0]["path"]
4977 if expected_aspath
in obtained_aspath
:
4978 as_path_found
= True
4980 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
4981 default_ipv4_route
, expected_aspath
4986 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
4987 expected_aspath
, obtained_aspath
4992 as_path_found
= True
4994 if origin_found
and locPrf_found
and metric_found
and as_path_found
:
4995 is_ipv4_default_attrib_found
= True
4997 "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
4998 origin
, locPrf
, metric
, expected_aspath
5002 is_ipv4_default_attrib_found
= False
5004 "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
5005 origin
, ipv4_route_Origin
5009 "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
5010 locPrf
, ipv4_route_local_pref
5014 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5015 metric
, ipv4_route_metric
5019 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5020 expected_aspath
, obtained_aspath
5024 route_Origin
= False
5025 route_local_pref
= False
5026 route_local_metric
= False
5027 default_ipv6_route
= ""
5029 ipv6_routes
["routes"]["0::0/0"]
5030 default_ipv6_route
= "0::0/0"
5032 ipv6_routes
["routes"]["::/0"]
5033 default_ipv6_route
= "::/0"
5034 if default_ipv6_route
in ipv6_routes
["routes"].keys():
5035 nxt_hop_count
= len(ipv6_routes
["routes"][default_ipv6_route
])
5037 for index
in range(nxt_hop_count
):
5038 rib_next_hops
.append(
5039 ipv6_routes
["routes"][default_ipv6_route
][index
]["nexthops"][0]["ip"]
5042 rib_next_hops
.append(
5043 ipv6_routes
["routes"][default_ipv6_route
][index
]["nexthops"][1][
5047 except (KeyError, IndexError) as e
:
5048 logger
.error("NO impact ..! Global IPV6 Address not found ")
5050 for nxt_hop
in expected_nexthop
.items():
5051 if nxt_hop
[0] == "ipv6":
5052 if nxt_hop
[1] in rib_next_hops
:
5054 "Default routes [{}] obtained from {} .....PASSED".format(
5055 default_ipv6_route
, nxt_hop
[1]
5060 "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
5061 default_ipv6_route
, nxt_hop
[1], rib_next_hops
5068 if "origin" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5069 route_Origin
= ipv6_routes
["routes"][default_ipv6_route
][0]["origin"]
5070 if "locPrf" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5071 route_local_pref
= ipv6_routes
["routes"][default_ipv6_route
][0]["locPrf"]
5072 if "metric" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5073 route_local_metric
= ipv6_routes
["routes"][default_ipv6_route
][0]["metric"]
5075 origin_found
= False
5076 locPrf_found
= False
5077 metric_found
= False
5078 as_path_found
= False
5081 if origin
== route_Origin
:
5083 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
5084 default_ipv6_route
, route_Origin
5090 "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
5091 origin
, route_Origin
5099 if locPrf
== route_local_pref
:
5101 "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
5102 default_ipv6_route
, route_local_pref
5108 "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
5109 locPrf
, route_local_pref
5117 if metric
== route_local_metric
:
5119 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
5120 default_ipv4_route
, metric
5127 "ERROR... IPV6::! Expected metric is {} obtained {}".format(
5128 metric
, route_local_metric
5136 obtained_aspath
= ipv6_routes
["routes"]["::/0"][0]["path"]
5137 if expected_aspath
in obtained_aspath
:
5138 as_path_found
= True
5140 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
5141 default_ipv4_route
, expected_aspath
5146 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
5147 expected_aspath
, obtained_aspath
5152 as_path_found
= True
5154 if origin_found
and locPrf_found
and metric_found
and as_path_found
:
5155 is_ipv6_default_attrib_found
= True
5157 "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5158 origin
, locPrf
, metric
, expected_aspath
5162 is_ipv6_default_attrib_found
= False
5164 "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin
, route_Origin
)
5167 "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
5168 locPrf
, route_local_pref
5172 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5173 metric
, route_local_metric
5177 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5178 expected_aspath
, obtained_aspath
5182 if is_ipv4_default_attrib_found
and is_ipv6_default_attrib_found
:
5183 logger
.info("The attributes are found for default route in RIB ")
5189 @retry(retry_timeout
=5)
5190 def verify_fib_default_route(tgen
, topo
, dut
, routes
, expected_nexthop
):
5192 API to verify the the 'Default route" in FIB
5196 dut : device under test
5197 routes : default route with expected nexthop
5198 expected_nexthop : the nexthop that is expected the deafult route
5202 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5203 tgen
= get_topogen()
5204 connected_routes
= {}
5205 for router
, rnode
in tgen
.routers().items():
5207 ipv4_routes
= run_frr_cmd(rnode
, "sh ip route json", isjson
=True)
5208 ipv6_routes
= run_frr_cmd(rnode
, "sh ipv6 route json", isjson
=True)
5210 is_ipv4_default_route_found
= False
5211 is_ipv6_default_route_found
= False
5212 if routes
["ipv4"] in ipv4_routes
.keys():
5213 rib_ipv4_nxt_hops
= []
5214 ipv4_default_route
= routes
["ipv4"]
5215 nxt_hop_count
= len(ipv4_routes
[ipv4_default_route
][0]["nexthops"])
5216 for index
in range(nxt_hop_count
):
5217 rib_ipv4_nxt_hops
.append(
5218 ipv4_routes
[ipv4_default_route
][0]["nexthops"][index
]["ip"]
5221 if expected_nexthop
["ipv4"] in rib_ipv4_nxt_hops
:
5222 is_ipv4_default_route_found
= True
5224 "{} default route with next hop {} is found in FIB ".format(
5225 ipv4_default_route
, expected_nexthop
5230 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5231 ipv4_default_route
, expected_nexthop
5236 if routes
["ipv6"] in ipv6_routes
.keys() or "::/0" in ipv6_routes
.keys():
5237 rib_ipv6_nxt_hops
= []
5238 if "::/0" in ipv6_routes
.keys():
5239 ipv6_default_route
= "::/0"
5240 elif routes
["ipv6"] in ipv6_routes
.keys():
5241 ipv6_default_route
= routes
["ipv6"]
5243 nxt_hop_count
= len(ipv6_routes
[ipv6_default_route
][0]["nexthops"])
5244 for index
in range(nxt_hop_count
):
5245 rib_ipv6_nxt_hops
.append(
5246 ipv6_routes
[ipv6_default_route
][0]["nexthops"][index
]["ip"]
5249 if expected_nexthop
["ipv6"] in rib_ipv6_nxt_hops
:
5250 is_ipv6_default_route_found
= True
5252 "{} default route with next hop {} is found in FIB ".format(
5253 ipv6_default_route
, expected_nexthop
5258 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5259 ipv6_default_route
, expected_nexthop
5264 if is_ipv4_default_route_found
and is_ipv6_default_route_found
:
5268 "Default Route for ipv4 and ipv6 address family is not found in FIB "
5273 @retry(retry_timeout
=5)
5274 def verify_bgp_advertised_routes_from_neighbor(tgen
, topo
, dut
, peer
, expected_routes
):
5276 APi is verifies the the routes that are advertised from dut to peer
5279 "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
5280 "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
5282 dut : Device Under Tests
5283 Peer : Peer on which the routs is expected
5284 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5287 returns: True / False
5291 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5292 tgen
= get_topogen()
5294 peer_ipv4_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv4"].split("/")[0]
5295 peer_ipv6_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv6"].split("/")[0]
5297 for router
, rnode
in tgen
.routers().items():
5299 ipv4_receieved_routes
= run_frr_cmd(
5301 "sh ip bgp neighbor {} advertised-routes json".format(
5302 peer_ipv4_neighbor_ip
5306 ipv6_receieved_routes
= run_frr_cmd(
5308 "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format(
5309 peer_ipv6_neighbor_ip
5313 ipv4_route_count
= 0
5314 ipv6_route_count
= 0
5315 if ipv4_receieved_routes
:
5316 for index
in range(len(expected_routes
["ipv4"])):
5318 expected_routes
["ipv4"][index
]["network"]
5319 in ipv4_receieved_routes
["advertisedRoutes"].keys()
5321 ipv4_route_count
+= 1
5323 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5324 dut
, expected_routes
["ipv4"][index
]["network"], peer
5329 expected_routes
["ipv4"][index
]["network"]
5330 in ipv4_receieved_routes
["bgpOriginatingDefaultNetwork"]
5332 ipv4_route_count
+= 1
5334 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5335 dut
, expected_routes
["ipv4"][index
]["network"], peer
5341 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5342 dut
, expected_routes
["ipv4"][index
]["network"], peer
5346 logger
.error(ipv4_receieved_routes
)
5348 "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format(
5354 if ipv6_receieved_routes
:
5355 for index
in range(len(expected_routes
["ipv6"])):
5357 expected_routes
["ipv6"][index
]["network"]
5358 in ipv6_receieved_routes
["advertisedRoutes"].keys()
5360 ipv6_route_count
+= 1
5362 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5363 dut
, expected_routes
["ipv6"][index
]["network"], peer
5367 expected_routes
["ipv6"][index
]["network"]
5368 in ipv6_receieved_routes
["bgpOriginatingDefaultNetwork"]
5370 ipv6_route_count
+= 1
5372 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5373 dut
, expected_routes
["ipv6"][index
]["network"], peer
5378 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5379 dut
, expected_routes
["ipv6"][index
]["network"], peer
5383 logger
.error(ipv6_receieved_routes
)
5385 "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format(
5391 if ipv4_route_count
== len(expected_routes
["ipv4"]) and ipv6_route_count
== len(
5392 expected_routes
["ipv6"]
5397 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5398 expected_routes
["ipv4"], ipv4_receieved_routes
["advertisedRoutes"]
5402 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5403 expected_routes
["ipv6"], ipv6_receieved_routes
["advertisedRoutes"]
5409 @retry(retry_timeout
=5)
5410 def verify_bgp_received_routes_from_neighbor(tgen
, topo
, dut
, peer
, expected_routes
):
5412 API to verify the bgp received routes
5416 show ip bgp neighbor <x.x.x.x> received-routes
5417 show ip bgp ipv6 unicast neighbor <x::x> received-routes
5421 dut : Device Under Tests
5422 Peer : Peer on which the routs is expected
5423 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5431 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5432 tgen
= get_topogen()
5434 peer_ipv4_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv4"].split("/")[0]
5435 peer_ipv6_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv6"].split("/")[0]
5437 logger
.info("Enabling Soft configuration to neighbor INBOUND ")
5438 neigbor_dict
= {"ipv4": peer_ipv4_neighbor_ip
, "ipv6": peer_ipv6_neighbor_ip
}
5439 result
= configure_bgp_soft_configuration(
5440 tgen
, dut
, neigbor_dict
, direction
="inbound"
5444 ), " Failed to configure the soft configuration \n Error: {}".format(result
)
5446 """sleep of 10 sec is required to get the routes on peer after soft configuration"""
5448 for router
, rnode
in tgen
.routers().items():
5450 ipv4_receieved_routes
= run_frr_cmd(
5452 "sh ip bgp neighbor {} received-routes json".format(
5453 peer_ipv4_neighbor_ip
5457 ipv6_receieved_routes
= run_frr_cmd(
5459 "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
5460 peer_ipv6_neighbor_ip
5464 ipv4_route_count
= 0
5465 ipv6_route_count
= 0
5466 if ipv4_receieved_routes
:
5467 for index
in range(len(expected_routes
["ipv4"])):
5469 expected_routes
["ipv4"][index
]["network"]
5470 in ipv4_receieved_routes
["receivedRoutes"].keys()
5472 ipv4_route_count
+= 1
5474 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5475 dut
, expected_routes
["ipv4"][index
]["network"], peer
5480 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5481 dut
, expected_routes
["ipv4"][index
]["network"], peer
5485 logger
.error(ipv4_receieved_routes
)
5487 "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
5493 if ipv6_receieved_routes
:
5494 for index
in range(len(expected_routes
["ipv6"])):
5496 expected_routes
["ipv6"][index
]["network"]
5497 in ipv6_receieved_routes
["receivedRoutes"].keys()
5499 ipv6_route_count
+= 1
5501 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5502 dut
, expected_routes
["ipv6"][index
]["network"], peer
5507 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5508 dut
, expected_routes
["ipv6"][index
]["network"], peer
5512 logger
.error(ipv6_receieved_routes
)
5514 "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
5520 if ipv4_route_count
== len(expected_routes
["ipv4"]) and ipv6_route_count
== len(
5521 expected_routes
["ipv6"]
5526 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5527 expected_routes
["ipv4"], ipv4_receieved_routes
["advertisedRoutes"]
5531 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5532 expected_routes
["ipv6"], ipv6_receieved_routes
["advertisedRoutes"]
5538 def configure_bgp_soft_configuration(tgen
, dut
, neighbor_dict
, direction
):
5540 Api to configure the bgp soft configuration to show the received routes from peer
5543 dut : device under test route on which the sonfiguration to be applied
5544 neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
5545 direction : Directionon which it should be applied in/out
5551 logger
.info("Enabling Soft configuration to neighbor INBOUND ")
5552 local_as
= get_dut_as_number(tgen
, dut
)
5553 ipv4_neighbor
= neighbor_dict
["ipv4"]
5554 ipv6_neighbor
= neighbor_dict
["ipv6"]
5555 direction
= direction
.lower()
5556 if ipv4_neighbor
and ipv4_neighbor
:
5560 "router bgp {}".format(local_as
),
5561 "address-family ipv4 unicast",
5562 "neighbor {} soft-reconfiguration {} ".format(
5563 ipv4_neighbor
, direction
5565 "exit-address-family",
5566 "address-family ipv6 unicast",
5567 "neighbor {} soft-reconfiguration {} ".format(
5568 ipv6_neighbor
, direction
5570 "exit-address-family",
5574 result
= apply_raw_config(tgen
, raw_config
)
5576 "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(