1 # SPDX-License-Identifier: ISC
3 # Copyright (c) 2019 by VMware, Inc. ("VMware")
4 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
5 # ("NetDEF") in this file.
11 from copy
import deepcopy
12 from time
import sleep
14 # Import common_config to use commomnly used APIs
15 from lib
.common_config
import (
16 create_common_configurations
,
21 find_interface_with_greater_ip
,
23 get_frr_ipv6_linklocal
,
28 from lib
.topogen
import get_topogen
29 from lib
.topolog
import logger
30 from lib
.topotest
import frr_unicode
32 from lib
import topotest
35 def create_router_bgp(tgen
, topo
=None, input_dict
=None, build
=False, load_config
=True):
37 API to configure bgp on router
41 * `tgen` : Topogen object
42 * `topo` : json file data
43 * `input_dict` : Input dict data, required when configuring from testcase
44 * `build` : Only for initial setup phase this is set as True.
52 "router_id": "22.22.22.22",
54 "graceful-restart": True,
55 "preserve-fw-state": True,
59 "select-defer-time": 30,
72 "redist_type": "static",
77 {"redist_type": "connected"}
79 "advertise_networks": [
81 "network": "20.0.0.0/32",
85 "network": "30.0.0.0/32",
96 "number_occurences": 2
105 "name": "RMAP_MED_R3",
108 "next_hop_self": True
110 "r1": {"graceful-restart-helper": True}
126 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
130 topo
= tgen
.json_topo
132 # Flag is used when testing ipv6 over ipv4 or vice-versa
136 input_dict
= deepcopy(topo
)
138 topo
= topo
["routers"]
139 input_dict
= deepcopy(input_dict
)
141 config_data_dict
= {}
143 for router
in input_dict
.keys():
144 if "bgp" not in input_dict
[router
]:
145 logger
.debug("Router %s: 'bgp' not present in input_dict", router
)
148 bgp_data_list
= input_dict
[router
]["bgp"]
150 if type(bgp_data_list
) is not list:
151 bgp_data_list
= [bgp_data_list
]
155 for bgp_data
in bgp_data_list
:
156 data_all_bgp
= __create_bgp_global(tgen
, bgp_data
, router
, build
)
158 bgp_addr_data
= bgp_data
.setdefault("address_family", {})
160 if not bgp_addr_data
:
162 "Router %s: 'address_family' not present in "
163 "input_dict for BGP",
168 ipv4_data
= bgp_addr_data
.setdefault("ipv4", {})
169 ipv6_data
= bgp_addr_data
.setdefault("ipv6", {})
170 l2vpn_data
= bgp_addr_data
.setdefault("l2vpn", {})
174 if ipv4_data
.setdefault("unicast", {})
175 or ipv6_data
.setdefault("unicast", {})
179 l2vpn_evpn
= True if l2vpn_data
.setdefault("evpn", {}) else False
182 data_all_bgp
= __create_bgp_unicast_neighbor(
188 config_data
=data_all_bgp
,
192 data_all_bgp
= __create_l2vpn_evpn_address_family(
193 tgen
, topo
, bgp_data
, router
, config_data
=data_all_bgp
196 config_data
.extend(data_all_bgp
)
199 config_data_dict
[router
] = config_data
202 result
= create_common_configurations(
203 tgen
, config_data_dict
, "bgp", build
, load_config
205 except InvalidCLIError
:
206 logger
.error("create_router_bgp", exc_info
=True)
209 logger
.debug("Exiting lib API: create_router_bgp()")
213 def __create_bgp_global(tgen
, input_dict
, router
, build
=False):
215 Helper API to create bgp global configuration.
219 * `tgen` : Topogen object
220 * `input_dict` : Input dict data, required when configuring from testcase
221 * `router` : router id to be configured.
222 * `build` : Only for initial setup phase this is set as True.
226 list of config commands
230 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
232 bgp_data
= input_dict
233 del_bgp_action
= bgp_data
.setdefault("delete", False)
237 if "local_as" not in bgp_data
and build
:
239 "Router %s: 'local_as' not present in input_dict" "for BGP", router
243 local_as
= bgp_data
.setdefault("local_as", "")
244 cmd
= "router bgp {}".format(local_as
)
245 vrf_id
= bgp_data
.setdefault("vrf", None)
247 cmd
= "{} vrf {}".format(cmd
, vrf_id
)
250 cmd
= "no {}".format(cmd
)
251 config_data
.append(cmd
)
255 config_data
.append(cmd
)
256 config_data
.append("no bgp ebgp-requires-policy")
258 router_id
= bgp_data
.setdefault("router_id", None)
259 del_router_id
= bgp_data
.setdefault("del_router_id", False)
261 config_data
.append("no bgp router-id")
263 config_data
.append("bgp router-id {}".format(router_id
))
265 config_data
.append("bgp log-neighbor-changes")
266 config_data
.append("no bgp network import-check")
267 bgp_peer_grp_data
= bgp_data
.setdefault("peer-group", {})
269 if "peer-group" in bgp_data
and bgp_peer_grp_data
:
270 peer_grp_data
= __create_bgp_peer_group(tgen
, bgp_peer_grp_data
, router
)
271 config_data
.extend(peer_grp_data
)
273 bst_path
= bgp_data
.setdefault("bestpath", None)
275 if "aspath" in bst_path
:
276 if "delete" in bst_path
:
278 "no bgp bestpath as-path {}".format(bst_path
["aspath"])
281 config_data
.append("bgp bestpath as-path {}".format(bst_path
["aspath"]))
283 if "graceful-restart" in bgp_data
:
284 graceful_config
= bgp_data
["graceful-restart"]
286 graceful_restart
= graceful_config
.setdefault("graceful-restart", None)
288 graceful_restart_disable
= graceful_config
.setdefault(
289 "graceful-restart-disable", None
292 preserve_fw_state
= graceful_config
.setdefault("preserve-fw-state", None)
294 disable_eor
= graceful_config
.setdefault("disable-eor", None)
296 if graceful_restart
== False:
297 cmd
= "no bgp graceful-restart"
299 cmd
= "bgp graceful-restart"
301 if graceful_restart
is not None:
302 config_data
.append(cmd
)
304 if graceful_restart_disable
== False:
305 cmd
= "no bgp graceful-restart-disable"
306 if graceful_restart_disable
:
307 cmd
= "bgp graceful-restart-disable"
309 if graceful_restart_disable
is not None:
310 config_data
.append(cmd
)
312 if preserve_fw_state
== False:
313 cmd
= "no bgp graceful-restart preserve-fw-state"
314 if preserve_fw_state
:
315 cmd
= "bgp graceful-restart preserve-fw-state"
317 if preserve_fw_state
is not None:
318 config_data
.append(cmd
)
320 if disable_eor
== False:
321 cmd
= "no bgp graceful-restart disable-eor"
323 cmd
= "bgp graceful-restart disable-eor"
325 if disable_eor
is not None:
326 config_data
.append(cmd
)
328 if "timer" in bgp_data
["graceful-restart"]:
329 timer
= bgp_data
["graceful-restart"]["timer"]
331 if "delete" in timer
:
332 del_action
= timer
["delete"]
336 for rs_timer
, value
in timer
.items():
337 rs_timer_value
= timer
.setdefault(rs_timer
, None)
339 if rs_timer_value
and rs_timer
!= "delete":
340 cmd
= "bgp graceful-restart {} {}".format(rs_timer
, rs_timer_value
)
343 cmd
= "no {}".format(cmd
)
345 config_data
.append(cmd
)
347 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
351 def __create_bgp_unicast_neighbor(
352 tgen
, topo
, input_dict
, router
, afi_test
, config_data
=None
355 Helper API to create configuration for address-family unicast
359 * `tgen` : Topogen object
360 * `topo` : json file data
361 * `input_dict` : Input dict data, required when configuring from testcase
362 * `router` : router id to be configured.
363 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
364 * `build` : Only for initial setup phase this is set as True.
368 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
371 bgp_data
= input_dict
372 if "router bgp" in config_data
:
375 bgp_data
= input_dict
["address_family"]
377 for addr_type
, addr_dict
in bgp_data
.items():
381 if not check_address_types(addr_type
) and not afi_test
:
384 addr_data
= addr_dict
["unicast"]
386 config_data
.append("address-family {} unicast".format(addr_type
))
388 advertise_network
= addr_data
.setdefault("advertise_networks", [])
389 for advertise_network_dict
in advertise_network
:
390 network
= advertise_network_dict
["network"]
391 if type(network
) is not list:
394 if "no_of_network" in advertise_network_dict
:
395 no_of_network
= advertise_network_dict
["no_of_network"]
399 del_action
= advertise_network_dict
.setdefault("delete", False)
401 # Generating IPs for verification
402 network_list
= generate_ips(network
, no_of_network
)
403 for ip
in network_list
:
404 ip
= str(ipaddress
.ip_network(frr_unicode(ip
)))
406 cmd
= "network {}".format(ip
)
408 cmd
= "no {}".format(cmd
)
410 config_data
.append(cmd
)
412 import_cmd
= addr_data
.setdefault("import", {})
415 if import_cmd
["delete"]:
416 config_data
.append("no import vrf {}".format(import_cmd
["vrf"]))
418 config_data
.append("import vrf {}".format(import_cmd
["vrf"]))
420 max_paths
= addr_data
.setdefault("maximum_paths", {})
422 ibgp
= max_paths
.setdefault("ibgp", None)
423 ebgp
= max_paths
.setdefault("ebgp", None)
424 del_cmd
= max_paths
.setdefault("delete", False)
427 config_data
.append("no maximum-paths ibgp {}".format(ibgp
))
429 config_data
.append("maximum-paths ibgp {}".format(ibgp
))
432 config_data
.append("no maximum-paths {}".format(ebgp
))
434 config_data
.append("maximum-paths {}".format(ebgp
))
436 aggregate_addresses
= addr_data
.setdefault("aggregate_address", [])
437 for aggregate_address
in aggregate_addresses
:
438 network
= aggregate_address
.setdefault("network", None)
441 "Router %s: 'network' not present in " "input_dict for BGP", router
444 cmd
= "aggregate-address {}".format(network
)
446 as_set
= aggregate_address
.setdefault("as_set", False)
447 summary
= aggregate_address
.setdefault("summary", False)
448 del_action
= aggregate_address
.setdefault("delete", False)
450 cmd
= "{} as-set".format(cmd
)
452 cmd
= "{} summary".format(cmd
)
455 cmd
= "no {}".format(cmd
)
457 config_data
.append(cmd
)
459 redistribute_data
= addr_data
.setdefault("redistribute", {})
460 if redistribute_data
:
461 for redistribute
in redistribute_data
:
462 if "redist_type" not in redistribute
:
464 "Router %s: 'redist_type' not present in " "input_dict", router
467 cmd
= "redistribute {}".format(redistribute
["redist_type"])
468 redist_attr
= redistribute
.setdefault("attribute", None)
470 if type(redist_attr
) is dict:
471 for key
, value
in redist_attr
.items():
472 cmd
= "{} {} {}".format(cmd
, key
, value
)
474 cmd
= "{} {}".format(cmd
, redist_attr
)
476 del_action
= redistribute
.setdefault("delete", False)
478 cmd
= "no {}".format(cmd
)
479 config_data
.append(cmd
)
481 admin_dist_data
= addr_data
.setdefault("distance", {})
483 if len(admin_dist_data
) < 2:
485 "Router %s: pass the admin distance values for "
486 "ebgp, ibgp and local routes",
489 cmd
= "distance bgp {} {} {}".format(
490 admin_dist_data
["ebgp"],
491 admin_dist_data
["ibgp"],
492 admin_dist_data
["local"],
495 del_action
= admin_dist_data
.setdefault("delete", False)
497 cmd
= "no distance bgp"
498 config_data
.append(cmd
)
500 import_vrf_data
= addr_data
.setdefault("import", {})
502 cmd
= "import vrf {}".format(import_vrf_data
["vrf"])
504 del_action
= import_vrf_data
.setdefault("delete", False)
506 cmd
= "no {}".format(cmd
)
507 config_data
.append(cmd
)
509 if "neighbor" in addr_data
:
510 neigh_data
= __create_bgp_neighbor(
511 topo
, input_dict
, router
, addr_type
, add_neigh
513 config_data
.extend(neigh_data
)
514 # configure default originate
515 if "default_originate" in addr_data
:
516 default_originate_config
= __create_bgp_default_originate_neighbor(
517 topo
, input_dict
, router
, addr_type
, add_neigh
519 config_data
.extend(default_originate_config
)
521 for addr_type
, addr_dict
in bgp_data
.items():
522 if not addr_dict
or not check_address_types(addr_type
):
525 addr_data
= addr_dict
["unicast"]
526 if "neighbor" in addr_data
:
527 neigh_addr_data
= __create_bgp_unicast_address_family(
528 topo
, input_dict
, router
, addr_type
, add_neigh
531 config_data
.extend(neigh_addr_data
)
533 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
537 def __create_bgp_default_originate_neighbor(
538 topo
, input_dict
, router
, addr_type
, add_neigh
=True
541 Helper API to create neighbor default - originate configuration
545 * `tgen` : Topogen object
546 * `topo` : json file data
547 * `input_dict` : Input dict data, required when configuring from testcase
548 * `router` : router id to be configured
552 logger
.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
554 bgp_data
= input_dict
["address_family"]
555 neigh_data
= bgp_data
[addr_type
]["unicast"]["default_originate"]
556 for name
, peer_dict
in neigh_data
.items():
557 nh_details
= topo
[name
]
560 if "dest-link" in neigh_data
[name
]:
561 dest_link
= neigh_data
[name
]["dest-link"]
562 neighbor_ip
= nh_details
["links"][dest_link
][addr_type
].split("/")[0]
563 elif "add_type" in neigh_data
[name
]:
564 add_type
= neigh_data
[name
]["add_type"]
565 neighbor_ip
= nh_details
["links"][add_type
][addr_type
].split("/")[0]
567 neighbor_ip
= nh_details
["links"][router
][addr_type
].split("/")[0]
569 config_data
.append("address-family {} unicast".format(addr_type
))
570 if "route_map" in peer_dict
:
571 route_map
= peer_dict
["route_map"]
572 if "delete" in peer_dict
:
573 if peer_dict
["delete"]:
575 "no neighbor {} default-originate route-map {}".format(
576 neighbor_ip
, route_map
581 " neighbor {} default-originate route-map {}".format(
582 neighbor_ip
, route_map
587 " neighbor {} default-originate route-map {}".format(
588 neighbor_ip
, route_map
593 if "delete" in peer_dict
:
594 if peer_dict
["delete"]:
596 "no neighbor {} default-originate".format(neighbor_ip
)
600 "neighbor {} default-originate".format(neighbor_ip
)
603 config_data
.append("neighbor {} default-originate".format(neighbor_ip
))
605 logger
.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
609 def __create_l2vpn_evpn_address_family(
610 tgen
, topo
, input_dict
, router
, config_data
=None
613 Helper API to create configuration for l2vpn evpn address-family
617 * `tgen` : Topogen object
618 * `topo` : json file data
619 * `input_dict` : Input dict data, required when configuring
621 * `router` : router id to be configured.
622 * `build` : Only for initial setup phase this is set as True.
627 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
629 bgp_data
= input_dict
["address_family"]
631 for family_type
, family_dict
in bgp_data
.items():
632 if family_type
!= "l2vpn":
635 family_data
= family_dict
["evpn"]
637 config_data
.append("address-family l2vpn evpn")
639 advertise_data
= family_data
.setdefault("advertise", {})
640 neighbor_data
= family_data
.setdefault("neighbor", {})
641 advertise_all_vni_data
= family_data
.setdefault("advertise-all-vni", None)
642 rd_data
= family_data
.setdefault("rd", None)
643 no_rd_data
= family_data
.setdefault("no rd", False)
644 route_target_data
= family_data
.setdefault("route-target", {})
647 for address_type
, unicast_type
in advertise_data
.items():
649 if type(unicast_type
) is dict:
650 for key
, value
in unicast_type
.items():
651 cmd
= "advertise {} {}".format(address_type
, key
)
654 route_map
= value
.setdefault("route-map", {})
655 advertise_del_action
= value
.setdefault("delete", None)
658 cmd
= "{} route-map {}".format(cmd
, route_map
)
660 if advertise_del_action
:
661 cmd
= "no {}".format(cmd
)
663 config_data
.append(cmd
)
666 for neighbor
, neighbor_data
in neighbor_data
.items():
667 ipv4_neighbor
= neighbor_data
.setdefault("ipv4", {})
668 ipv6_neighbor
= neighbor_data
.setdefault("ipv6", {})
671 for neighbor_name
, action
in ipv4_neighbor
.items():
672 neighbor_ip
= topo
[neighbor
]["links"][neighbor_name
][
676 if type(action
) is dict:
677 next_hop_self
= action
.setdefault("next_hop_self", None)
678 route_maps
= action
.setdefault("route_maps", {})
680 if next_hop_self
is not None:
681 if next_hop_self
is True:
684 "next-hop-self".format(neighbor_ip
)
686 elif next_hop_self
is False:
689 "next-hop-self".format(neighbor_ip
)
693 for route_map
in route_maps
:
694 name
= route_map
.setdefault("name", {})
695 direction
= route_map
.setdefault("direction", "in")
696 del_action
= route_map
.setdefault("delete", False)
702 "input_dict for BGP "
703 "neighbor route name",
707 cmd
= "neighbor {} route-map {} " "{}".format(
708 neighbor_ip
, name
, direction
712 cmd
= "no {}".format(cmd
)
714 config_data
.append(cmd
)
717 if action
== "activate":
718 cmd
= "neighbor {} activate".format(neighbor_ip
)
719 elif action
== "deactivate":
720 cmd
= "no neighbor {} activate".format(neighbor_ip
)
722 config_data
.append(cmd
)
725 for neighbor_name
, action
in ipv4_neighbor
.items():
726 neighbor_ip
= topo
[neighbor
]["links"][neighbor_name
][
729 if action
== "activate":
730 cmd
= "neighbor {} activate".format(neighbor_ip
)
731 elif action
== "deactivate":
732 cmd
= "no neighbor {} activate".format(neighbor_ip
)
734 config_data
.append(cmd
)
736 if advertise_all_vni_data
== True:
737 cmd
= "advertise-all-vni"
738 config_data
.append(cmd
)
739 elif advertise_all_vni_data
== False:
740 cmd
= "no advertise-all-vni"
741 config_data
.append(cmd
)
744 cmd
= "rd {}".format(rd_data
)
745 config_data
.append(cmd
)
748 cmd
= "no rd {}".format(no_rd_data
)
749 config_data
.append(cmd
)
751 if route_target_data
:
752 for rt_type
, rt_dict
in route_target_data
.items():
753 for _rt_dict
in rt_dict
:
754 rt_value
= _rt_dict
.setdefault("value", None)
755 del_rt
= _rt_dict
.setdefault("delete", None)
758 cmd
= "route-target {} {}".format(rt_type
, rt_value
)
760 cmd
= "no {}".format(cmd
)
762 config_data
.append(cmd
)
764 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
769 def __create_bgp_peer_group(topo
, input_dict
, router
):
771 Helper API to create neighbor specific configuration
775 * `topo` : json file data
776 * `input_dict` : Input dict data, required when configuring from testcase
777 * `router` : router id to be configured
780 logger
.debug("Entering lib API: __create_bgp_peer_group()")
782 for grp
, grp_dict
in input_dict
.items():
783 config_data
.append("neighbor {} peer-group".format(grp
))
784 neigh_cxt
= "neighbor {} ".format(grp
)
785 update_source
= grp_dict
.setdefault("update-source", None)
786 remote_as
= grp_dict
.setdefault("remote-as", None)
787 capability
= grp_dict
.setdefault("capability", None)
789 config_data
.append("{} update-source {}".format(neigh_cxt
, update_source
))
792 config_data
.append("{} remote-as {}".format(neigh_cxt
, remote_as
))
795 config_data
.append("{} capability {}".format(neigh_cxt
, capability
))
797 logger
.debug("Exiting lib API: __create_bgp_peer_group()")
801 def __create_bgp_neighbor(topo
, input_dict
, router
, addr_type
, add_neigh
=True):
803 Helper API to create neighbor specific configuration
807 * `tgen` : Topogen object
808 * `topo` : json file data
809 * `input_dict` : Input dict data, required when configuring from testcase
810 * `router` : router id to be configured
813 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
815 bgp_data
= input_dict
["address_family"]
816 neigh_data
= bgp_data
[addr_type
]["unicast"]["neighbor"]
817 global_connect
= input_dict
.get("connecttimer", 5)
819 for name
, peer_dict
in neigh_data
.items():
821 for dest_link
, peer
in peer_dict
["dest_link"].items():
822 local_asn
= peer
.setdefault("local_asn", {})
824 local_as
= local_asn
.setdefault("local_as", 0)
825 remote_as
= local_asn
.setdefault("remote_as", 0)
826 no_prepend
= local_asn
.setdefault("no_prepend", False)
827 replace_as
= local_asn
.setdefault("replace_as", False)
828 if local_as
== remote_as
:
829 assert False is True, (
830 " Configuration Error : Router must not have "
831 "same AS-NUMBER as Local AS NUMBER"
833 nh_details
= topo
[name
]
835 if "vrfs" in topo
[router
] or type(nh_details
["bgp"]) is list:
836 for vrf_data
in nh_details
["bgp"]:
837 if "vrf" in nh_details
["links"][dest_link
] and "vrf" in vrf_data
:
838 if nh_details
["links"][dest_link
]["vrf"] == vrf_data
["vrf"]:
840 remote_as
= vrf_data
["local_as"]
843 if "vrf" not in vrf_data
:
845 remote_as
= vrf_data
["local_as"]
849 remote_as
= nh_details
["bgp"]["local_as"]
853 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
854 ip_addr
= nh_details
["links"][dest_link
]["peer-interface"]
855 elif "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
856 intf
= topo
[name
]["links"][dest_link
]["interface"]
857 ip_addr
= get_frr_ipv6_linklocal(tgen
, name
, intf
)
858 elif dest_link
in nh_details
["links"].keys():
860 ip_addr
= nh_details
["links"][dest_link
][addr_type
].split("/")[0]
862 intf
= topo
[name
]["links"][dest_link
]["interface"]
863 ip_addr
= get_frr_ipv6_linklocal(tgen
, name
, intf
)
864 if "delete" in peer
and peer
["delete"]:
865 neigh_cxt
= "no neighbor {}".format(ip_addr
)
866 config_data
.append("{}".format(neigh_cxt
))
869 neigh_cxt
= "neighbor {}".format(ip_addr
)
871 if "peer-group" in peer
:
873 "neighbor {} interface peer-group {}".format(
874 ip_addr
, peer
["peer-group"]
879 if "source_link" in peer
:
880 if peer
["source_link"] == "lo":
881 update_source
= topo
[router
]["links"]["lo"][addr_type
].split("/")[0]
883 update_source
= topo
[router
]["links"][peer
["source_link"]][
886 if "peer-group" not in peer
:
887 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
889 "{} interface remote-as {}".format(neigh_cxt
, remote_as
)
892 config_data
.append("{} remote-as {}".format(neigh_cxt
, remote_as
))
894 if local_asn
and local_as
:
895 cmd
= "{} local-as {}".format(neigh_cxt
, local_as
)
897 cmd
= "{} no-prepend".format(cmd
)
899 cmd
= "{} replace-as".format(cmd
)
900 config_data
.append("{}".format(cmd
))
902 if addr_type
== "ipv6":
903 config_data
.append("address-family ipv6 unicast")
904 config_data
.append("{} activate".format(neigh_cxt
))
906 if "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
908 "{} update-source {}".format(
909 neigh_cxt
, nh_details
["links"][dest_link
]["peer-interface"]
913 "{} interface {}".format(
914 neigh_cxt
, nh_details
["links"][dest_link
]["peer-interface"]
918 disable_connected
= peer
.setdefault("disable_connected_check", False)
919 connect
= peer
.get("connecttimer", global_connect
)
920 keep_alive
= peer
.setdefault("keepalivetimer", 3)
921 hold_down
= peer
.setdefault("holddowntimer", 10)
922 password
= peer
.setdefault("password", None)
923 no_password
= peer
.setdefault("no_password", None)
924 capability
= peer
.setdefault("capability", None)
925 max_hop_limit
= peer
.setdefault("ebgp_multihop", 1)
927 graceful_restart
= peer
.setdefault("graceful-restart", None)
928 graceful_restart_helper
= peer
.setdefault("graceful-restart-helper", None)
929 graceful_restart_disable
= peer
.setdefault("graceful-restart-disable", None)
931 config_data
.append("{} capability {}".format(neigh_cxt
, capability
))
935 "{} update-source {}".format(neigh_cxt
, update_source
)
937 if disable_connected
:
939 "{} disable-connected-check".format(disable_connected
)
943 "{} update-source {}".format(neigh_cxt
, update_source
)
945 if int(keep_alive
) != 60 and int(hold_down
) != 180:
947 "{} timers {} {}".format(neigh_cxt
, keep_alive
, hold_down
)
949 if int(connect
) != 120:
950 config_data
.append("{} timers connect {}".format(neigh_cxt
, connect
))
953 config_data
.append("{} graceful-restart".format(neigh_cxt
))
954 elif graceful_restart
== False:
955 config_data
.append("no {} graceful-restart".format(neigh_cxt
))
957 if graceful_restart_helper
:
958 config_data
.append("{} graceful-restart-helper".format(neigh_cxt
))
959 elif graceful_restart_helper
== False:
960 config_data
.append("no {} graceful-restart-helper".format(neigh_cxt
))
962 if graceful_restart_disable
:
963 config_data
.append("{} graceful-restart-disable".format(neigh_cxt
))
964 elif graceful_restart_disable
== False:
965 config_data
.append("no {} graceful-restart-disable".format(neigh_cxt
))
968 config_data
.append("{} password {}".format(neigh_cxt
, password
))
971 config_data
.append("no {} password {}".format(neigh_cxt
, no_password
))
973 if max_hop_limit
> 1:
975 "{} ebgp-multihop {}".format(neigh_cxt
, max_hop_limit
)
977 config_data
.append("{} enforce-multihop".format(neigh_cxt
))
979 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
983 def __create_bgp_unicast_address_family(
984 topo
, input_dict
, router
, addr_type
, add_neigh
=True
987 API prints bgp global config to bgp_json file.
991 * `bgp_cfg` : BGP class variables have BGP config saved in it for
993 * `local_as_no` : Local as number
994 * `router_id` : Router-id
995 * `ecmp_path` : ECMP max path
996 * `gr_enable` : BGP global gracefull restart config
1000 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1001 tgen
= get_topogen()
1002 bgp_data
= input_dict
["address_family"]
1003 neigh_data
= bgp_data
[addr_type
]["unicast"]["neighbor"]
1005 for peer_name
, peer_dict
in deepcopy(neigh_data
).items():
1006 for dest_link
, peer
in peer_dict
["dest_link"].items():
1009 nh_details
= topo
[peer_name
]
1010 activate_addr_family
= peer
.setdefault("activate", None)
1011 deactivate_addr_family
= peer
.setdefault("deactivate", None)
1012 # Loopback interface
1013 if "source_link" in peer
and peer
["source_link"] == "lo":
1014 for destRouterLink
, data
in sorted(nh_details
["links"].items()):
1015 if "type" in data
and data
["type"] == "loopback":
1016 if dest_link
== destRouterLink
:
1018 nh_details
["links"][destRouterLink
][addr_type
]
1023 # Physical interface
1025 # check the neighbor type if un numbered nbr, use interface.
1026 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
1027 ip_addr
= nh_details
["links"][dest_link
]["peer-interface"]
1028 elif "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
1029 intf
= topo
[peer_name
]["links"][dest_link
]["interface"]
1030 ip_addr
= get_frr_ipv6_linklocal(tgen
, peer_name
, intf
)
1031 elif dest_link
in nh_details
["links"].keys():
1033 ip_addr
= nh_details
["links"][dest_link
][addr_type
].split("/")[
1037 intf
= topo
[peer_name
]["links"][dest_link
]["interface"]
1038 ip_addr
= get_frr_ipv6_linklocal(tgen
, peer_name
, intf
)
1041 and bgp_data
["ipv6"]
1042 and check_address_types("ipv6")
1043 and "ipv6" in nh_details
["links"][dest_link
]
1045 deactivate
= nh_details
["links"][dest_link
]["ipv6"].split("/")[
1049 neigh_cxt
= "neighbor {}".format(ip_addr
)
1050 config_data
.append("address-family {} unicast".format(addr_type
))
1052 if activate_addr_family
is not None:
1054 "address-family {} unicast".format(activate_addr_family
)
1057 config_data
.append("{} activate".format(neigh_cxt
))
1059 if deactivate
and activate_addr_family
is None:
1060 config_data
.append("no neighbor {} activate".format(deactivate
))
1062 if deactivate_addr_family
is not None:
1064 "address-family {} unicast".format(deactivate_addr_family
)
1066 config_data
.append("no {} activate".format(neigh_cxt
))
1068 next_hop_self
= peer
.setdefault("next_hop_self", None)
1069 send_community
= peer
.setdefault("send_community", None)
1070 prefix_lists
= peer
.setdefault("prefix_lists", {})
1071 route_maps
= peer
.setdefault("route_maps", {})
1072 no_send_community
= peer
.setdefault("no_send_community", None)
1073 capability
= peer
.setdefault("capability", None)
1074 allowas_in
= peer
.setdefault("allowas-in", None)
1077 if next_hop_self
is not None:
1078 if next_hop_self
is True:
1079 config_data
.append("{} next-hop-self".format(neigh_cxt
))
1081 config_data
.append("no {} next-hop-self".format(neigh_cxt
))
1085 config_data
.append("{} send-community".format(neigh_cxt
))
1088 if no_send_community
:
1090 "no {} send-community {}".format(neigh_cxt
, no_send_community
)
1094 if capability
and addr_type
== "ipv6":
1095 config_data
.append("address-family ipv4 unicast")
1096 config_data
.append("{} activate".format(neigh_cxt
))
1098 if "allowas_in" in peer
:
1099 allow_as_in
= peer
["allowas_in"]
1100 config_data
.append("{} allowas-in {}".format(neigh_cxt
, allow_as_in
))
1102 if "no_allowas_in" in peer
:
1103 allow_as_in
= peer
["no_allowas_in"]
1104 config_data
.append("no {} allowas-in {}".format(neigh_cxt
, allow_as_in
))
1106 if "shutdown" in peer
:
1108 "{} {} shutdown".format(
1109 "no" if not peer
["shutdown"] else "", neigh_cxt
1114 for prefix_list
in prefix_lists
:
1115 name
= prefix_list
.setdefault("name", {})
1116 direction
= prefix_list
.setdefault("direction", "in")
1117 del_action
= prefix_list
.setdefault("delete", False)
1120 "Router %s: 'name' not present in "
1121 "input_dict for BGP neighbor prefix lists",
1125 cmd
= "{} prefix-list {} {}".format(neigh_cxt
, name
, direction
)
1127 cmd
= "no {}".format(cmd
)
1128 config_data
.append(cmd
)
1131 for route_map
in route_maps
:
1132 name
= route_map
.setdefault("name", {})
1133 direction
= route_map
.setdefault("direction", "in")
1134 del_action
= route_map
.setdefault("delete", False)
1137 "Router %s: 'name' not present in "
1138 "input_dict for BGP neighbor route name",
1142 cmd
= "{} route-map {} {}".format(neigh_cxt
, name
, direction
)
1144 cmd
= "no {}".format(cmd
)
1145 config_data
.append(cmd
)
1148 number_occurences
= allowas_in
.setdefault("number_occurences", {})
1149 del_action
= allowas_in
.setdefault("delete", False)
1151 cmd
= "{} allowas-in {}".format(neigh_cxt
, number_occurences
)
1154 cmd
= "no {}".format(cmd
)
1156 config_data
.append(cmd
)
1161 def modify_bgp_config_when_bgpd_down(tgen
, topo
, input_dict
):
1163 API will save the current config to router's /etc/frr/ for BGPd
1164 daemon(bgpd.conf file)
1168 * `tgen` : Topogen object
1169 * `topo` : json file data
1170 * `input_dict` : defines for which router, and which config
1171 needs to be modified
1175 # Modify graceful-restart config not to set f-bit
1176 # and write to /etc/frr
1178 # Api call to delete advertised networks
1185 "advertise_networks": [
1187 "network": "101.0.20.1/32",
1196 "advertise_networks": [
1198 "network": "5::1/128",
1210 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
1214 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1216 result
= create_router_bgp(
1217 tgen
, topo
, input_dict
, build
=False, load_config
=False
1219 if result
is not True:
1222 # Copy bgp config file to /etc/frr
1223 for dut
in input_dict
.keys():
1224 router_list
= tgen
.routers()
1225 for router
, rnode
in router_list
.items():
1229 logger
.info("Delete BGP config when BGPd is down in {}".format(router
))
1230 # Reading the config from "rundir" and copy to /etc/frr/bgpd.conf
1231 cmd
= "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
1232 tgen
.logdir
, router
, FRRCFG_FILE
1234 router_list
[router
].run(cmd
)
1236 except Exception as e
:
1237 errormsg
= traceback
.format_exc()
1238 logger
.error(errormsg
)
1241 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1245 #############################################
1247 #############################################
1248 @retry(retry_timeout
=8)
1249 def verify_router_id(tgen
, topo
, input_dict
, expected
=True):
1251 Running command "show ip bgp json" for DUT and reading router-id
1252 from input_dict and verifying with command output.
1253 1. Statically modfified router-id should take place
1254 2. When static router-id is deleted highest loopback should
1256 3. When loopback intf is down then highest physcial intf
1257 should become router-id
1261 * `tgen`: topogen object
1262 * `topo`: input json file data
1263 * `input_dict`: input dictionary, have details of Device Under Test, for
1264 which user wants to test the data
1265 * `expected` : expected results from API, by-default True
1269 # Verify if router-id for r1 is 12.12.12.12
1272 "router_id": "12.12.12.12"
1274 # Verify that router-id for r1 is highest interface ip
1278 result = verify_router_id(tgen, topo, input_dict)
1282 errormsg(str) or True
1285 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1286 for router
in input_dict
.keys():
1287 if router
not in tgen
.routers():
1290 rnode
= tgen
.routers()[router
]
1292 del_router_id
= input_dict
[router
]["bgp"].setdefault("del_router_id", False)
1294 logger
.info("Checking router %s router-id", router
)
1295 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
1296 router_id_out
= show_bgp_json
["ipv4Unicast"]["routerId"]
1297 router_id_out
= ipaddress
.IPv4Address(frr_unicode(router_id_out
))
1299 # Once router-id is deleted, highest interface ip should become
1302 router_id
= find_interface_with_greater_ip(topo
, router
)
1304 router_id
= input_dict
[router
]["bgp"]["router_id"]
1305 router_id
= ipaddress
.IPv4Address(frr_unicode(router_id
))
1307 if router_id
== router_id_out
:
1308 logger
.info("Found expected router-id %s for router %s", router_id
, router
)
1311 "Router-id for router:{} mismatch, expected:"
1312 " {} but found:{}".format(router
, router_id
, router_id_out
)
1316 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1320 @retry(retry_timeout
=150)
1321 def verify_bgp_convergence(tgen
, topo
=None, dut
=None, expected
=True):
1323 API will verify if BGP is converged with in the given time frame.
1324 Running "show bgp summary json" command and verify bgp neighbor
1325 state is established,
1329 * `tgen`: topogen object
1330 * `topo`: input json file data
1331 * `dut`: device under test
1335 # To veriry is BGP is converged for all the routers used in
1337 results = verify_bgp_convergence(tgen, topo, dut="r1")
1341 errormsg(str) or True
1345 topo
= tgen
.json_topo
1348 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1349 tgen
= get_topogen()
1350 for router
, rnode
in tgen
.routers().items():
1351 if "bgp" not in topo
["routers"][router
]:
1354 if dut
is not None and dut
!= router
:
1357 logger
.info("Verifying BGP Convergence on router %s:", router
)
1358 show_bgp_json
= run_frr_cmd(rnode
, "show bgp vrf all summary json", isjson
=True)
1359 # Verifying output dictionary show_bgp_json is empty or not
1360 if not bool(show_bgp_json
):
1361 errormsg
= "BGP is not running"
1364 # To find neighbor ip type
1365 bgp_data_list
= topo
["routers"][router
]["bgp"]
1367 if type(bgp_data_list
) is not list:
1368 bgp_data_list
= [bgp_data_list
]
1370 for bgp_data
in bgp_data_list
:
1371 if "vrf" in bgp_data
:
1372 vrf
= bgp_data
["vrf"]
1378 # To find neighbor ip type
1379 bgp_addr_type
= bgp_data
["address_family"]
1380 if "l2vpn" in bgp_addr_type
:
1383 if "neighbor" not in bgp_addr_type
["l2vpn"]["evpn"]:
1386 bgp_neighbors
= bgp_addr_type
["l2vpn"]["evpn"]["neighbor"]
1387 total_evpn_peer
+= len(bgp_neighbors
)
1390 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1391 for _addr_type
, dest_link_dict
in peer_data
.items():
1392 data
= topo
["routers"][bgp_neighbor
]["links"]
1393 for dest_link
in dest_link_dict
.keys():
1394 if dest_link
in data
:
1395 peer_details
= peer_data
[_addr_type
][dest_link
]
1397 neighbor_ip
= data
[dest_link
][_addr_type
].split("/")[0]
1401 "ipv4Unicast" in show_bgp_json
[vrf
]
1402 or "ipv6Unicast" in show_bgp_json
[vrf
]
1405 "[DUT: %s] VRF: %s, "
1406 "ipv4Unicast/ipv6Unicast"
1407 " address-family present"
1408 " under l2vpn" % (router
, vrf
)
1412 l2VpnEvpn_data
= show_bgp_json
[vrf
]["l2VpnEvpn"][
1415 nh_state
= l2VpnEvpn_data
[neighbor_ip
]["state"]
1417 if nh_state
== "Established":
1418 no_of_evpn_peer
+= 1
1420 if no_of_evpn_peer
== total_evpn_peer
:
1422 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
1429 "[DUT: %s] VRF: %s, BGP is not converged "
1430 "for evpn peers" % (router
, vrf
)
1435 for addr_type
in bgp_addr_type
.keys():
1436 if not check_address_types(addr_type
):
1439 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1441 for bgp_neighbor
in bgp_neighbors
:
1442 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
1445 for addr_type
in bgp_addr_type
.keys():
1446 if not check_address_types(addr_type
):
1448 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1450 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1451 for dest_link
in peer_data
["dest_link"].keys():
1452 data
= topo
["routers"][bgp_neighbor
]["links"]
1453 if dest_link
in data
:
1454 peer_details
= peer_data
["dest_link"][dest_link
]
1455 # for link local neighbors
1457 "neighbor_type" in peer_details
1458 and peer_details
["neighbor_type"] == "link-local"
1460 intf
= topo
["routers"][bgp_neighbor
]["links"][
1463 neighbor_ip
= get_frr_ipv6_linklocal(
1464 tgen
, bgp_neighbor
, intf
1466 elif "source_link" in peer_details
:
1467 neighbor_ip
= topo
["routers"][bgp_neighbor
][
1469 ][peer_details
["source_link"]][addr_type
].split(
1475 "neighbor_type" in peer_details
1476 and peer_details
["neighbor_type"] == "unnumbered"
1478 neighbor_ip
= data
[dest_link
]["peer-interface"]
1480 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[
1484 neighbor_ip
= neighbor_ip
.lower()
1485 if addr_type
== "ipv4":
1486 ipv4_data
= show_bgp_json
[vrf
]["ipv4Unicast"][
1489 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1491 ipv6_data
= show_bgp_json
[vrf
]["ipv6Unicast"][
1494 if neighbor_ip
in ipv6_data
:
1495 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1497 if nh_state
== "Established":
1500 if no_of_peer
== total_peer
and no_of_peer
> 0:
1501 logger
.info("[DUT: %s] VRF: %s, BGP is Converged", router
, vrf
)
1504 errormsg
= "[DUT: %s] VRF: %s, BGP is not converged" % (router
, vrf
)
1507 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1511 @retry(retry_timeout
=16)
1512 def verify_bgp_community(
1523 API to veiryf BGP large community is attached in route for any given
1524 DUT by running "show bgp ipv4/6 {route address} json" command.
1528 * `tgen`: topogen object
1529 * `addr_type` : ip type, ipv4/ipv6
1530 * `dut`: Device Under Test
1531 * `network`: network for which set criteria needs to be verified
1532 * `input_dict`: having details like - for which router, community and
1533 values needs to be verified
1535 * `bestpath`: To check best path cli
1536 * `expected` : expected results from API, by-default True
1540 networks = ["200.50.2.0/32"]
1542 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1544 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1548 errormsg(str) or True
1551 logger
.debug("Entering lib API: verify_bgp_community()")
1552 if router
not in tgen
.routers():
1555 rnode
= tgen
.routers()[router
]
1558 "Verifying BGP community attributes on dut %s: for %s " "network %s",
1564 command
= "show bgp"
1568 cmd
= "{} vrf {} {} {} json".format(command
, vrf
, addr_type
, net
)
1570 cmd
= "{} {} {} bestpath json".format(command
, addr_type
, net
)
1572 cmd
= "{} {} {} json".format(command
, addr_type
, net
)
1574 show_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
1575 if "paths" not in show_bgp_json
:
1576 return "Prefix {} not found in BGP table of router: {}".format(net
, router
)
1578 as_paths
= show_bgp_json
["paths"]
1580 for i
in range(len(as_paths
)):
1582 "largeCommunity" in show_bgp_json
["paths"][i
]
1583 or "community" in show_bgp_json
["paths"][i
]
1587 "Large Community attribute is found for route:" " %s in router: %s",
1591 if input_dict
is not None:
1592 for criteria
, comm_val
in input_dict
.items():
1593 show_val
= show_bgp_json
["paths"][i
][criteria
]["string"]
1594 if comm_val
== show_val
:
1596 "Verifying BGP %s for prefix: %s"
1597 " in router: %s, found expected"
1606 "Failed: Verifying BGP attribute"
1607 " {} for route: {} in router: {}"
1608 ", expected value: {} but found"
1609 ": {}".format(criteria
, net
, router
, comm_val
, show_val
)
1615 "Large Community attribute is not found for route: "
1616 "{} in router: {} ".format(net
, router
)
1620 logger
.debug("Exiting lib API: verify_bgp_community()")
1624 def modify_as_number(tgen
, topo
, input_dict
):
1626 API reads local_as and remote_as from user defined input_dict and
1627 modify router"s ASNs accordingly. Router"s config is modified and
1628 recent/changed config is loadeded to router.
1632 * `tgen` : Topogen object
1633 * `topo` : json file data
1634 * `input_dict` : defines for which router ASNs needs to be modified
1638 To modify ASNs for router r1
1645 result = modify_as_number(tgen, topo, input_dict)
1649 errormsg(str) or True
1652 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1655 new_topo
= deepcopy(topo
["routers"])
1657 for router
in input_dict
.keys():
1658 # Remove bgp configuration
1660 router_dict
.update({router
: {"bgp": {"delete": True}}})
1662 new_topo
[router
]["bgp"]["local_as"] = input_dict
[router
]["bgp"][
1666 new_topo
[router
]["bgp"][0]["local_as"] = input_dict
[router
]["bgp"][
1669 logger
.info("Removing bgp configuration")
1670 create_router_bgp(tgen
, topo
, router_dict
)
1672 logger
.info("Applying modified bgp configuration")
1673 result
= create_router_bgp(tgen
, new_topo
)
1674 if result
is not True:
1675 result
= "Error applying new AS number config"
1676 except Exception as e
:
1677 errormsg
= traceback
.format_exc()
1678 logger
.error(errormsg
)
1681 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1685 @retry(retry_timeout
=8)
1686 def verify_as_numbers(tgen
, topo
, input_dict
, expected
=True):
1688 This API is to verify AS numbers for given DUT by running
1689 "show ip bgp neighbor json" command. Local AS and Remote AS
1690 will ve verified with input_dict data and command output.
1694 * `tgen`: topogen object
1695 * `topo`: input json file data
1696 * `addr_type` : ip type, ipv4/ipv6
1697 * `input_dict`: defines - for which router, AS numbers needs to be verified
1698 * `expected` : expected results from API, by-default True
1709 result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1713 errormsg(str) or True
1716 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1717 for router
in input_dict
.keys():
1718 if router
not in tgen
.routers():
1721 rnode
= tgen
.routers()[router
]
1723 logger
.info("Verifying AS numbers for dut %s:", router
)
1725 show_ip_bgp_neighbor_json
= run_frr_cmd(
1726 rnode
, "show ip bgp neighbor json", isjson
=True
1728 local_as
= input_dict
[router
]["bgp"]["local_as"]
1729 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1731 for addr_type
in bgp_addr_type
:
1732 if not check_address_types(addr_type
):
1735 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1737 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1738 remote_as
= input_dict
[bgp_neighbor
]["bgp"]["local_as"]
1739 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
1741 data
= topo
["routers"][bgp_neighbor
]["links"]
1743 if dest_link
in data
:
1744 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1745 neigh_data
= show_ip_bgp_neighbor_json
[neighbor_ip
]
1746 # Verify Local AS for router
1747 if neigh_data
["localAs"] != local_as
:
1749 "Failed: Verify local_as for dut {},"
1750 " found: {} but expected: {}".format(
1751 router
, neigh_data
["localAs"], local_as
1757 "Verified local_as for dut %s, found" " expected: %s",
1762 # Verify Remote AS for neighbor
1763 if neigh_data
["remoteAs"] != remote_as
:
1765 "Failed: Verify remote_as for dut "
1766 "{}'s neighbor {}, found: {} but "
1767 "expected: {}".format(
1768 router
, bgp_neighbor
, neigh_data
["remoteAs"], remote_as
1774 "Verified remote_as for dut %s's "
1775 "neighbor %s, found expected: %s",
1781 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1785 @retry(retry_timeout
=150)
1786 def verify_bgp_convergence_from_running_config(tgen
, dut
=None, expected
=True):
1788 API to verify BGP convergence b/w loopback and physical interface.
1789 This API would be used when routers have BGP neighborship is loopback
1790 to physical or vice-versa
1794 * `tgen`: topogen object
1795 * `dut`: device under test
1796 * `expected` : expected results from API, by-default True
1800 results = verify_bgp_convergence_bw_lo_and_phy_intf(tgen, topo,
1805 errormsg(str) or True
1808 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1810 for router
, rnode
in tgen
.routers().items():
1811 if dut
is not None and dut
!= router
:
1814 logger
.info("Verifying BGP Convergence on router %s:", router
)
1815 show_bgp_json
= run_frr_cmd(rnode
, "show bgp vrf all summary json", isjson
=True)
1816 # Verifying output dictionary show_bgp_json is empty or not
1817 if not bool(show_bgp_json
):
1818 errormsg
= "BGP is not running"
1821 for vrf
, addr_family_data
in show_bgp_json
.items():
1822 for address_family
, neighborship_data
in addr_family_data
.items():
1826 total_peer
= len(neighborship_data
["peers"].keys())
1828 for peer
, peer_data
in neighborship_data
["peers"].items():
1829 if peer_data
["state"] == "Established":
1832 if total_peer
!= no_of_peer
:
1834 "[DUT: %s] VRF: %s, BGP is not converged"
1835 " for peer: %s" % (router
, vrf
, peer
)
1839 logger
.info("[DUT: %s]: vrf: %s, BGP is Converged", router
, vrf
)
1841 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1846 def clear_bgp(tgen
, addr_type
, router
, vrf
=None, neighbor
=None):
1848 This API is to clear bgp neighborship by running
1849 clear ip bgp */clear bgp ipv6 * command,
1853 * `tgen`: topogen object
1854 * `addr_type`: ip type ipv4/ipv6
1855 * `router`: device under test
1857 * `neighbor`: Neighbor for which bgp needs to be cleared
1861 clear_bgp(tgen, addr_type, "r1")
1864 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1866 if router
not in tgen
.routers():
1869 rnode
= tgen
.routers()[router
]
1872 if type(vrf
) is not list:
1876 logger
.info("Clearing BGP neighborship for router %s..", router
)
1877 if addr_type
== "ipv4":
1880 run_frr_cmd(rnode
, "clear ip bgp vrf {} *".format(_vrf
))
1882 run_frr_cmd(rnode
, "clear bgp ipv4 {}".format(neighbor
))
1884 run_frr_cmd(rnode
, "clear ip bgp *")
1885 elif addr_type
== "ipv6":
1888 run_frr_cmd(rnode
, "clear bgp vrf {} ipv6 *".format(_vrf
))
1890 run_frr_cmd(rnode
, "clear bgp ipv6 {}".format(neighbor
))
1892 run_frr_cmd(rnode
, "clear bgp ipv6 *")
1894 run_frr_cmd(rnode
, "clear bgp *")
1896 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1899 def clear_bgp_and_verify(tgen
, topo
, router
, rid
=None):
1901 This API is to clear bgp neighborship and verify bgp neighborship
1902 is coming up(BGP is converged) usinf "show bgp summary json" command
1903 and also verifying for all bgp neighbors uptime before and after
1904 clear bgp sessions is different as the uptime must be changed once
1905 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1909 * `tgen`: topogen object
1910 * `topo`: input json file data
1911 * `router`: device under test
1915 result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1919 errormsg(str) or True
1922 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1924 if router
not in tgen
.routers():
1927 rnode
= tgen
.routers()[router
]
1929 peer_uptime_before_clear_bgp
= {}
1932 # Verifying BGP convergence before bgp clear command
1933 for retry
in range(50):
1934 # Waiting for BGP to converge
1936 "Waiting for %s sec for BGP to converge on router" " %s...",
1942 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
1943 # Verifying output dictionary show_bgp_json is empty or not
1944 if not bool(show_bgp_json
):
1945 errormsg
= "BGP is not running"
1948 # To find neighbor ip type
1950 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1952 bgp_addr_type
= topo
["routers"][router
]["bgp"][0]["address_family"]
1955 for addr_type
in bgp_addr_type
.keys():
1957 if not check_address_types(addr_type
):
1960 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1962 for bgp_neighbor
in bgp_neighbors
:
1963 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
1966 for addr_type
in bgp_addr_type
:
1967 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1969 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1970 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
1971 data
= topo
["routers"][bgp_neighbor
]["links"]
1973 if dest_link
in data
:
1974 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1975 if addr_type
== "ipv4":
1976 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
1977 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1979 # Peer up time dictionary
1980 peer_uptime_before_clear_bgp
[bgp_neighbor
] = ipv4_data
[
1982 ]["peerUptimeEstablishedEpoch"]
1984 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
1985 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1987 # Peer up time dictionary
1988 peer_uptime_before_clear_bgp
[bgp_neighbor
] = ipv6_data
[
1990 ]["peerUptimeEstablishedEpoch"]
1992 if nh_state
== "Established":
1995 if no_of_peer
== total_peer
:
1996 logger
.info("BGP is Converged for router %s before bgp" " clear", router
)
2000 "BGP is not yet Converged for router %s " "before bgp clear", router
2004 "TIMEOUT!! BGP is not converged in {} seconds for"
2005 " router {}".format(retry
* sleeptime
, router
)
2010 logger
.info("Clearing BGP neighborship for router %s..", router
)
2011 for addr_type
in bgp_addr_type
.keys():
2012 if addr_type
== "ipv4":
2014 run_frr_cmd(rnode
, "clear bgp ipv4 {}".format(rid
))
2016 run_frr_cmd(rnode
, "clear bgp ipv4 *")
2017 elif addr_type
== "ipv6":
2019 run_frr_cmd(rnode
, "clear bgp ipv6 {}".format(rid
))
2021 run_frr_cmd(rnode
, "clear bgp ipv6 *")
2022 peer_uptime_after_clear_bgp
= {}
2023 # Verifying BGP convergence after bgp clear command
2024 for retry
in range(50):
2026 # Waiting for BGP to converge
2028 "Waiting for %s sec for BGP to converge on router" " %s...",
2034 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
2035 # Verifying output dictionary show_bgp_json is empty or not
2036 if not bool(show_bgp_json
):
2037 errormsg
= "BGP is not running"
2040 # To find neighbor ip type
2042 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
2044 bgp_addr_type
= topo
["routers"][router
]["bgp"][0]["address_family"]
2047 for addr_type
in bgp_addr_type
.keys():
2048 if not check_address_types(addr_type
):
2051 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2053 for bgp_neighbor
in bgp_neighbors
:
2054 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
2057 for addr_type
in bgp_addr_type
:
2058 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2060 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2061 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2062 data
= topo
["routers"][bgp_neighbor
]["links"]
2064 if dest_link
in data
:
2065 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2066 if addr_type
== "ipv4":
2067 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2068 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2069 peer_uptime_after_clear_bgp
[bgp_neighbor
] = ipv4_data
[
2071 ]["peerUptimeEstablishedEpoch"]
2073 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2074 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2075 # Peer up time dictionary
2076 peer_uptime_after_clear_bgp
[bgp_neighbor
] = ipv6_data
[
2078 ]["peerUptimeEstablishedEpoch"]
2080 if nh_state
== "Established":
2083 if no_of_peer
== total_peer
:
2084 logger
.info("BGP is Converged for router %s after bgp clear", router
)
2088 "BGP is not yet Converged for router %s after" " bgp clear", router
2092 "TIMEOUT!! BGP is not converged in {} seconds for"
2093 " router {}".format(retry
* sleeptime
, router
)
2097 # Comparing peerUptimeEstablishedEpoch dictionaries
2098 if peer_uptime_before_clear_bgp
!= peer_uptime_after_clear_bgp
:
2099 logger
.info("BGP neighborship is reset after clear BGP on router %s", router
)
2102 "BGP neighborship is not reset after clear bgp on router"
2103 " {}".format(router
)
2107 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2111 def verify_bgp_timers_and_functionality(tgen
, topo
, input_dict
):
2113 To verify BGP timer config, execute "show ip bgp neighbor json" command
2114 and verify bgp timers with input_dict data.
2115 To veirfy bgp timers functonality, shutting down peer interface
2116 and verify BGP neighborship status.
2120 * `tgen`: topogen object
2121 * `topo`: input json file data
2122 * `addr_type`: ip type, ipv4/ipv6
2123 * `input_dict`: defines for which router, bgp timers needs to be verified
2126 # To verify BGP timers for neighbor r2 of router r1
2132 "keepalivetimer": 5,
2133 "holddowntimer": 15,
2135 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
2140 errormsg(str) or True
2143 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2145 router_list
= tgen
.routers()
2146 for router
in input_dict
.keys():
2147 if router
not in router_list
:
2150 rnode
= router_list
[router
]
2152 logger
.info("Verifying bgp timers functionality, DUT is %s:", router
)
2154 show_ip_bgp_neighbor_json
= run_frr_cmd(
2155 rnode
, "show ip bgp neighbor json", isjson
=True
2158 bgp_addr_type
= input_dict
[router
]["bgp"]["address_family"]
2160 for addr_type
in bgp_addr_type
:
2161 if not check_address_types(addr_type
):
2164 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2165 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2166 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2167 data
= topo
["routers"][bgp_neighbor
]["links"]
2169 keepalivetimer
= peer_dict
["keepalivetimer"]
2170 holddowntimer
= peer_dict
["holddowntimer"]
2172 if dest_link
in data
:
2173 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2174 neighbor_intf
= data
[dest_link
]["interface"]
2176 # Verify HoldDownTimer for neighbor
2177 bgpHoldTimeMsecs
= show_ip_bgp_neighbor_json
[neighbor_ip
][
2178 "bgpTimerHoldTimeMsecs"
2180 if bgpHoldTimeMsecs
!= holddowntimer
* 1000:
2182 "Verifying holddowntimer for bgp "
2183 "neighbor {} under dut {}, found: {} "
2184 "but expected: {}".format(
2188 holddowntimer
* 1000,
2193 # Verify KeepAliveTimer for neighbor
2194 bgpKeepAliveTimeMsecs
= show_ip_bgp_neighbor_json
[neighbor_ip
][
2195 "bgpTimerKeepAliveIntervalMsecs"
2197 if bgpKeepAliveTimeMsecs
!= keepalivetimer
* 1000:
2199 "Verifying keepalivetimer for bgp "
2200 "neighbor {} under dut {}, found: {} "
2201 "but expected: {}".format(
2204 bgpKeepAliveTimeMsecs
,
2205 keepalivetimer
* 1000,
2210 ####################
2211 # Shutting down peer interface after keepalive time and
2212 # after some time bringing up peer interface.
2213 # verifying BGP neighborship in (hold down-keep alive)
2214 # time, it should not go down
2215 ####################
2217 # Wait till keep alive time
2218 logger
.info("=" * 20)
2219 logger
.info("Scenario 1:")
2221 "Shutdown and bring up peer interface: %s "
2222 "in keep alive time : %s sec and verify "
2223 " BGP neighborship is intact in %s sec ",
2226 (holddowntimer
- keepalivetimer
),
2228 logger
.info("=" * 20)
2229 logger
.info("Waiting for %s sec..", keepalivetimer
)
2230 sleep(keepalivetimer
)
2232 # Shutting down peer ineterface
2234 "Shutting down interface %s on router %s",
2238 topotest
.interface_set_status(
2239 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=False
2242 # Bringing up peer interface
2245 "Bringing up interface %s on router %s..",
2249 topotest
.interface_set_status(
2250 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=True
2253 # Verifying BGP neighborship is intact in
2254 # (holddown - keepalive) time
2256 keepalivetimer
, holddowntimer
, int(holddowntimer
/ 3)
2258 logger
.info("Waiting for %s sec..", keepalivetimer
)
2259 sleep(keepalivetimer
)
2261 show_bgp_json
= run_frr_cmd(
2262 rnode
, "show bgp summary json", isjson
=True
2265 if addr_type
== "ipv4":
2266 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2267 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2269 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2270 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2272 if timer
== (holddowntimer
- keepalivetimer
):
2273 if nh_state
!= "Established":
2275 "BGP neighborship has not gone "
2276 "down in {} sec for neighbor {}".format(
2283 "BGP neighborship is intact in %s"
2284 " sec for neighbor %s",
2289 ####################
2290 # Shutting down peer interface and verifying that BGP
2291 # neighborship is going down in holddown time
2292 ####################
2293 logger
.info("=" * 20)
2294 logger
.info("Scenario 2:")
2296 "Shutdown peer interface: %s and verify BGP"
2297 " neighborship has gone down in hold down "
2302 logger
.info("=" * 20)
2305 "Shutting down interface %s on router %s..",
2309 topotest
.interface_set_status(
2310 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=False
2313 # Verifying BGP neighborship is going down in holddown time
2316 (holddowntimer
+ keepalivetimer
),
2317 int(holddowntimer
/ 3),
2319 logger
.info("Waiting for %s sec..", keepalivetimer
)
2320 sleep(keepalivetimer
)
2322 show_bgp_json
= run_frr_cmd(
2323 rnode
, "show bgp summary json", isjson
=True
2326 if addr_type
== "ipv4":
2327 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2328 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2330 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2331 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2333 if timer
== holddowntimer
:
2334 if nh_state
== "Established":
2336 "BGP neighborship has not gone "
2337 "down in {} sec for neighbor {}".format(
2344 "BGP neighborship has gone down in"
2345 " %s sec for neighbor %s",
2350 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2354 @retry(retry_timeout
=16)
2355 def verify_bgp_attributes(
2368 API will verify BGP attributes set by Route-map for given prefix and
2369 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
2370 in DUT to verify BGP attributes set by route-map, Set attributes
2371 values will be read from input_dict and verified with command output.
2373 * `tgen`: topogen object
2374 * `addr_type` : ip type, ipv4/ipv6
2375 * `dut`: Device Under Test
2376 * `static_routes`: Static Routes for which BGP set attributes needs to be
2378 * `rmap_name`: route map name for which set criteria needs to be verified
2379 * `input_dict`: defines for which router, AS numbers needs
2380 * `seq_id`: sequence number of rmap, default is None
2381 * `expected` : expected results from API, by-default True
2385 # To verify BGP attribute "localpref" set to 150 and "med" set to 30
2386 for prefix 10.0.20.1/32 in router r3.
2390 "rmap_match_pf_list1": [
2393 "match": {"prefix_list": "pf_list_1"},
2394 "set": {"localpref": 150, "med": 30}
2398 "as_path": "500 400"
2401 static_routes (list) = ["10.0.20.1/32"]
2407 errormsg(str) or True
2410 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2411 for router
, rnode
in tgen
.routers().items():
2415 logger
.info("Verifying BGP set attributes for dut {}:".format(router
))
2417 for static_route
in static_routes
:
2419 cmd
= "show bgp vrf {} {} {} json".format(vrf
, addr_type
, static_route
)
2421 cmd
= "show bgp {} {} json".format(addr_type
, static_route
)
2422 show_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2427 dict_list
= list(input_dict
.values())[0]
2429 if "route_maps" in dict_list
:
2430 for rmap_router
in input_dict
.keys():
2431 for rmap
, values
in input_dict
[rmap_router
]["route_maps"].items():
2432 if rmap
== rmap_name
:
2433 dict_to_test
= values
2434 for rmap_dict
in values
:
2435 if seq_id
is not None:
2436 if type(seq_id
) is not list:
2439 if "seq_id" in rmap_dict
:
2440 rmap_seq_id
= rmap_dict
["seq_id"]
2441 for _seq_id
in seq_id
:
2442 if _seq_id
== rmap_seq_id
:
2443 tmp_list
.append(rmap_dict
)
2445 dict_to_test
= tmp_list
2448 for rmap_dict
in dict_to_test
:
2449 if "set" in rmap_dict
:
2450 for criteria
in rmap_dict
["set"].keys():
2452 for path
in show_bgp_json
["paths"]:
2453 if criteria
not in path
:
2456 if criteria
== "aspath":
2457 value
= path
[criteria
]["string"]
2459 value
= path
[criteria
]
2461 if rmap_dict
["set"][criteria
] == value
:
2480 "Failed: Verifying BGP "
2481 "attribute {} for route:"
2482 " {} in router: {}, "
2483 " expected value: {} but"
2484 " found: {}".format(
2488 rmap_dict
["set"][criteria
],
2494 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2498 @retry(retry_timeout
=8)
2499 def verify_best_path_as_per_bgp_attribute(
2500 tgen
, addr_type
, router
, input_dict
, attribute
, expected
=True
2503 API is to verify best path according to BGP attributes for given routes.
2504 "show bgp ipv4/6 json" command will be run and verify best path according
2505 to shortest as-path, highest local-preference and med, lowest weight and
2506 route origin IGP>EGP>INCOMPLETE.
2509 * `tgen` : topogen object
2510 * `addr_type` : ip type, ipv4/ipv6
2511 * `tgen` : topogen object
2512 * `attribute` : calculate best path using this attribute
2513 * `input_dict`: defines different routes to calculate for which route
2514 best path is selected
2515 * `expected` : expected results from API, by-default True
2519 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
2520 router r7 to router r1(DUT) as per shortest as-path attribute
2527 "advertise_networks": [
2529 "network": "200.50.2.0/32"
2532 "network": "200.60.2.0/32"
2541 attribute = "locPrf"
2542 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
2543 input_dict, attribute)
2546 errormsg(str) or True
2549 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2551 if router
not in tgen
.routers():
2554 rnode
= tgen
.routers()[router
]
2556 # Verifying show bgp json
2557 command
= "show bgp"
2560 logger
.info("Verifying router %s RIB for best path:", router
)
2562 static_route
= False
2563 advertise_network
= False
2564 for route_val
in input_dict
.values():
2565 if "static_routes" in route_val
:
2567 networks
= route_val
["static_routes"]
2569 advertise_network
= True
2570 net_data
= route_val
["bgp"]["address_family"][addr_type
]["unicast"]
2571 networks
= net_data
["advertise_networks"]
2573 for network
in networks
:
2574 _network
= network
["network"]
2575 no_of_ip
= network
.setdefault("no_of_ip", 1)
2576 vrf
= network
.setdefault("vrf", None)
2579 cmd
= "{} vrf {}".format(command
, vrf
)
2583 cmd
= "{} {}".format(cmd
, addr_type
)
2584 cmd
= "{} json".format(cmd
)
2585 sh_ip_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2587 routes
= generate_ips(_network
, no_of_ip
)
2588 for route
in routes
:
2589 route
= str(ipaddress
.ip_network(frr_unicode(route
)))
2591 if route
in sh_ip_bgp_json
["routes"]:
2592 route_attributes
= sh_ip_bgp_json
["routes"][route
]
2596 for route_attribute
in route_attributes
:
2597 next_hops
= route_attribute
["nexthops"]
2598 for next_hop
in next_hops
:
2599 next_hop_ip
= next_hop
["ip"]
2600 attribute_dict
[next_hop_ip
] = route_attribute
[attribute
]
2603 if attribute
== "path":
2604 # Find next_hop for the route have minimum as_path
2606 attribute_dict
, key
=lambda x
: len(set(attribute_dict
[x
]))
2608 compare
= "SHORTEST"
2610 # LOCAL_PREF attribute
2611 elif attribute
== "locPrf":
2612 # Find next_hop for the route have highest local preference
2614 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2619 elif attribute
== "weight":
2620 # Find next_hop for the route have highest weight
2622 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2627 elif attribute
== "origin":
2628 # Find next_hop for the route have IGP as origin, -
2629 # - rule is IGP>EGP>INCOMPLETE
2632 for (key
, value
) in attribute_dict
.items()
2638 elif attribute
== "metric":
2639 # Find next_hop for the route have LOWEST MED
2641 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2646 if addr_type
== "ipv4":
2647 command_1
= "show ip route"
2649 command_1
= "show ipv6 route"
2652 cmd
= "{} vrf {} json".format(command_1
, vrf
)
2654 cmd
= "{} json".format(command_1
)
2656 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2658 # Verifying output dictionary rib_routes_json is not empty
2659 if not bool(rib_routes_json
):
2660 errormsg
= "No route found in RIB of router {}..".format(router
)
2665 # Find best is installed in RIB
2666 if route
in rib_routes_json
:
2668 # Verify next_hop in rib_routes_json
2670 rib_routes_json
[route
][0]["nexthops"][0]["ip"]
2676 "Incorrect Nexthop for BGP route {} in "
2677 "RIB of router {}, Expected: {}, Found:"
2681 rib_routes_json
[route
][0]["nexthops"][0]["ip"],
2687 if st_found
and nh_found
:
2689 "Best path for prefix: %s with next_hop: %s is "
2690 "installed according to %s %s: (%s) in RIB of "
2696 attribute_dict
[_next_hop
],
2700 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2704 @retry(retry_timeout
=10)
2705 def verify_best_path_as_per_admin_distance(
2706 tgen
, addr_type
, router
, input_dict
, attribute
, expected
=True, vrf
=None
2709 API is to verify best path according to admin distance for given
2710 route. "show ip/ipv6 route json" command will be run and verify
2711 best path accoring to shortest admin distanc.
2715 * `addr_type` : ip type, ipv4/ipv6
2716 * `dut`: Device Under Test
2717 * `tgen` : topogen object
2718 * `attribute` : calculate best path using admin distance
2719 * `input_dict`: defines different routes with different admin distance
2720 to calculate for which route best path is selected
2721 * `expected` : expected results from API, by-default True
2722 * `vrf`: Pass vrf name check for perticular vrf.
2726 # To verify best path for route 200.50.2.0/32 from router r2 to
2727 router r1(DUT) as per shortest admin distance which is 60.
2730 "static_routes": [{"network": "200.50.2.0/32", \
2731 "admin_distance": 80, "next_hop": "10.0.0.14"},
2732 {"network": "200.50.2.0/32", \
2733 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2735 attribute = "locPrf"
2736 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2737 input_dict, attribute):
2740 errormsg(str) or True
2743 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2744 router_list
= tgen
.routers()
2745 if router
not in router_list
:
2748 rnode
= tgen
.routers()[router
]
2751 logger
.info("Verifying router %s RIB for best path:", router
)
2754 if addr_type
== "ipv4":
2755 command
= "show ip route"
2757 command
= "show ipv6 route"
2760 command
= "{} vrf {} json".format(command
, vrf
)
2762 command
= "{} json".format(command
)
2764 for routes_from_router
in input_dict
.keys():
2765 sh_ip_route_json
= router_list
[routes_from_router
].vtysh_cmd(
2766 command
, isjson
=True
2768 networks
= input_dict
[routes_from_router
]["static_routes"]
2769 for network
in networks
:
2770 route
= network
["network"]
2772 route_attributes
= sh_ip_route_json
[route
]
2776 for route_attribute
in route_attributes
:
2777 next_hops
= route_attribute
["nexthops"]
2778 for next_hop
in next_hops
:
2779 next_hop_ip
= next_hop
["ip"]
2780 attribute_dict
[next_hop_ip
] = route_attribute
["distance"]
2782 # Find next_hop for the route have LOWEST Admin Distance
2783 _next_hop
= min(attribute_dict
, key
=(lambda k
: attribute_dict
[k
]))
2787 rib_routes_json
= run_frr_cmd(rnode
, command
, isjson
=True)
2789 # Verifying output dictionary rib_routes_json is not empty
2790 if not bool(rib_routes_json
):
2791 errormsg
= "No route found in RIB of router {}..".format(router
)
2796 # Find best is installed in RIB
2797 if route
in rib_routes_json
:
2799 # Verify next_hop in rib_routes_json
2802 for nh
in rib_routes_json
[route
][0]["nexthops"]
2803 if nh
["ip"] == _next_hop
2808 "Nexthop {} is Missing for BGP route {}"
2809 " in RIB of router {}\n".format(_next_hop
, route
, router
)
2813 if st_found
and nh_found
:
2815 "Best path for prefix: %s is installed according"
2816 " to %s %s: (%s) in RIB of router %s",
2820 attribute_dict
[_next_hop
],
2824 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2828 @retry(retry_timeout
=30)
2840 This API is to verify whether bgp rib has any
2841 matching route for a nexthop.
2845 * `tgen`: topogen object
2846 * `dut`: input dut router name
2847 * `addr_type` : ip type ipv4/ipv6
2848 * `input_dict` : input dict, has details of static routes
2849 * `next_hop`[optional]: next_hop which needs to be verified,
2851 * 'aspath'[optional]: aspath which needs to be verified
2852 * `expected` : expected results from API, by-default True
2857 next_hop = "192.168.1.10"
2858 input_dict = topo['routers']
2859 aspath = "100 200 300"
2860 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2865 errormsg(str) or True
2868 logger
.debug("Entering lib API: verify_bgp_rib()")
2870 router_list
= tgen
.routers()
2871 additional_nexthops_in_required_nhs
= []
2875 for routerInput
in input_dict
.keys():
2876 for router
, rnode
in router_list
.items():
2880 # Verifying RIB routes
2881 command
= "show bgp"
2885 logger
.info("Checking router {} BGP RIB:".format(dut
))
2887 if "static_routes" in input_dict
[routerInput
]:
2888 static_routes
= input_dict
[routerInput
]["static_routes"]
2890 for static_route
in static_routes
:
2896 vrf
= static_route
.setdefault("vrf", None)
2897 community
= static_route
.setdefault("community", None)
2898 largeCommunity
= static_route
.setdefault("largeCommunity", None)
2901 cmd
= "{} vrf {} {}".format(command
, vrf
, addr_type
)
2904 cmd
= "{} community {}".format(cmd
, community
)
2907 cmd
= "{} large-community {}".format(cmd
, largeCommunity
)
2909 cmd
= "{} {}".format(command
, addr_type
)
2911 cmd
= "{} json".format(cmd
)
2913 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2915 # Verifying output dictionary rib_routes_json is not empty
2916 if bool(rib_routes_json
) == False:
2917 errormsg
= "No route found in rib of router {}..".format(router
)
2920 network
= static_route
["network"]
2922 if "no_of_ip" in static_route
:
2923 no_of_ip
= static_route
["no_of_ip"]
2927 # Generating IPs for verification
2928 ip_list
= generate_ips(network
, no_of_ip
)
2930 for st_rt
in ip_list
:
2931 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
2933 _addr_type
= validate_ip_address(st_rt
)
2934 if _addr_type
!= addr_type
:
2937 if st_rt
in rib_routes_json
["routes"]:
2939 found_routes
.append(st_rt
)
2941 if next_hop
and multi_nh
and st_found
:
2942 if type(next_hop
) is not list:
2943 next_hop
= [next_hop
]
2947 0, len(rib_routes_json
["routes"][st_rt
])
2952 for rib_r
in rib_routes_json
["routes"][
2957 for mnh
in found_hops
:
2958 for each_nh_in_multipath
in mnh
:
2959 list2
.append(each_nh_in_multipath
)
2961 missing_list_of_nexthops
= set(list2
).difference(
2964 additional_nexthops_in_required_nhs
= set(
2969 if additional_nexthops_in_required_nhs
:
2971 "Missing nexthop %s for route"
2972 " %s in RIB of router %s\n",
2973 additional_nexthops_in_required_nhs
,
2980 elif next_hop
and multi_nh
is None:
2981 if type(next_hop
) is not list:
2982 next_hop
= [next_hop
]
2986 for rib_r
in rib_routes_json
["routes"][st_rt
][0][
2991 missing_list_of_nexthops
= set(list2
).difference(list1
)
2992 additional_nexthops_in_required_nhs
= set(
2997 if additional_nexthops_in_required_nhs
:
2999 "Missing nexthop %s for route"
3000 " %s in RIB of router %s\n",
3001 additional_nexthops_in_required_nhs
,
3006 "Nexthop {} is Missing for "
3007 "route {} in RIB of router {}\n".format(
3008 additional_nexthops_in_required_nhs
,
3018 found_paths
= rib_routes_json
["routes"][st_rt
][0][
3021 if aspath
== found_paths
:
3024 "Found AS path {} for route"
3025 " {} in RIB of router "
3026 "{}\n".format(aspath
, st_rt
, dut
)
3030 "AS Path {} is missing for route"
3031 "for route {} in RIB of router {}\n".format(
3038 missing_routes
.append(st_rt
)
3042 "Found next_hop {} for all bgp"
3044 " router {}\n".format(next_hop
, router
)
3047 if len(missing_routes
) > 0:
3049 "Missing route in RIB of router {}, "
3050 "routes: {}\n".format(dut
, missing_routes
)
3056 "Verified routes in router {} BGP RIB, "
3057 "found routes are: {} \n".format(dut
, found_routes
)
3061 if "bgp" not in input_dict
[routerInput
]:
3064 # Advertise networks
3065 bgp_data_list
= input_dict
[routerInput
]["bgp"]
3067 if type(bgp_data_list
) is not list:
3068 bgp_data_list
= [bgp_data_list
]
3070 for bgp_data
in bgp_data_list
:
3071 vrf_id
= bgp_data
.setdefault("vrf", None)
3073 cmd
= "{} vrf {} {}".format(command
, vrf_id
, addr_type
)
3075 cmd
= "{} {}".format(command
, addr_type
)
3077 cmd
= "{} json".format(cmd
)
3079 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3081 # Verifying output dictionary rib_routes_json is not empty
3082 if bool(rib_routes_json
) == False:
3083 errormsg
= "No route found in rib of router {}..".format(router
)
3086 bgp_net_advertise
= bgp_data
["address_family"][addr_type
]["unicast"]
3087 advertise_network
= bgp_net_advertise
.setdefault(
3088 "advertise_networks", []
3091 for advertise_network_dict
in advertise_network
:
3096 network
= advertise_network_dict
["network"]
3098 if "no_of_network" in advertise_network_dict
:
3099 no_of_network
= advertise_network_dict
["no_of_network"]
3103 # Generating IPs for verification
3104 ip_list
= generate_ips(network
, no_of_network
)
3106 for st_rt
in ip_list
:
3107 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
3109 _addr_type
= validate_ip_address(st_rt
)
3110 if _addr_type
!= addr_type
:
3113 if st_rt
in rib_routes_json
["routes"]:
3115 found_routes
.append(st_rt
)
3118 missing_routes
.append(st_rt
)
3120 if len(missing_routes
) > 0:
3122 "Missing route in BGP RIB of router {},"
3123 " are: {}\n".format(dut
, missing_routes
)
3129 "Verified routes in router {} BGP RIB, found "
3130 "routes are: {}\n".format(dut
, found_routes
)
3133 logger
.debug("Exiting lib API: verify_bgp_rib()")
3137 @retry(retry_timeout
=10)
3138 def verify_graceful_restart(
3139 tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True
3142 This API is to verify verify_graceful_restart configuration of DUT and
3143 cross verify the same from the peer bgp routerrouter.
3147 * `tgen`: topogen object
3148 * `topo`: input json file data
3149 * `addr_type` : ip type ipv4/ipv6
3150 * `input_dict`: input dictionary, have details of Device Under Test, for
3151 which user wants to test the data
3152 * `dut`: input dut router name
3153 * `peer`: input peer router name
3154 * `expected` : expected results from API, by-default True
3167 "graceful-restart": True
3180 "graceful-restart": True
3192 result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
3193 dut = "r1", peer = 'r2')
3196 errormsg(str) or True
3199 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3201 for router
, rnode
in tgen
.routers().items():
3206 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3208 bgp_addr_type
= topo
["routers"][dut
]["bgp"][0]["address_family"]
3210 # bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3212 if addr_type
in bgp_addr_type
:
3213 if not check_address_types(addr_type
):
3216 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3218 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3219 if bgp_neighbor
!= peer
:
3222 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3223 data
= topo
["routers"][bgp_neighbor
]["links"]
3225 if dest_link
in data
:
3226 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3229 "[DUT: {}]: Checking bgp graceful-restart show"
3230 " o/p {}".format(dut
, neighbor_ip
)
3233 show_bgp_graceful_json
= None
3235 show_bgp_graceful_json
= run_frr_cmd(
3237 "show bgp {} neighbor {} graceful-restart json".format(
3238 addr_type
, neighbor_ip
3243 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3245 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3247 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3250 errormsg
= "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3258 if "address_family" in input_dict
[dut
]["bgp"]:
3259 bgp_neighbors
= input_dict
[dut
]["bgp"]["address_family"][addr_type
][
3261 ]["neighbor"][peer
]["dest_link"]
3263 for dest_link
, data
in bgp_neighbors
.items():
3265 "graceful-restart-helper" in data
3266 and data
["graceful-restart-helper"]
3269 elif "graceful-restart" in data
and data
["graceful-restart"]:
3272 "graceful-restart-disable" in data
3273 and data
["graceful-restart-disable"]
3280 if "graceful-restart" in input_dict
[dut
]["bgp"]:
3283 "graceful-restart" in input_dict
[dut
]["bgp"]["graceful-restart"]
3284 and input_dict
[dut
]["bgp"]["graceful-restart"][
3290 "graceful-restart-disable"
3291 in input_dict
[dut
]["bgp"]["graceful-restart"]
3292 and input_dict
[dut
]["bgp"]["graceful-restart"][
3293 "graceful-restart-disable"
3302 if lmode
== "Disable" or lmode
== "Disable*":
3306 if "address_family" in input_dict
[peer
]["bgp"]:
3307 bgp_neighbors
= input_dict
[peer
]["bgp"]["address_family"][addr_type
][
3309 ]["neighbor"][dut
]["dest_link"]
3311 for dest_link
, data
in bgp_neighbors
.items():
3313 "graceful-restart-helper" in data
3314 and data
["graceful-restart-helper"]
3317 elif "graceful-restart" in data
and data
["graceful-restart"]:
3320 "graceful-restart-disable" in data
3321 and data
["graceful-restart-disable"]
3328 if "graceful-restart" in input_dict
[peer
]["bgp"]:
3332 in input_dict
[peer
]["bgp"]["graceful-restart"]
3333 and input_dict
[peer
]["bgp"]["graceful-restart"][
3339 "graceful-restart-disable"
3340 in input_dict
[peer
]["bgp"]["graceful-restart"]
3341 and input_dict
[peer
]["bgp"]["graceful-restart"][
3342 "graceful-restart-disable"
3351 if show_bgp_graceful_json_out
["localGrMode"] == lmode
:
3353 "[DUT: {}]: localGrMode : {} ".format(
3354 dut
, show_bgp_graceful_json_out
["localGrMode"]
3359 "[DUT: {}]: localGrMode is not correct"
3360 " Expected: {}, Found: {}".format(
3361 dut
, lmode
, show_bgp_graceful_json_out
["localGrMode"]
3366 if show_bgp_graceful_json_out
["remoteGrMode"] == rmode
:
3368 "[DUT: {}]: remoteGrMode : {} ".format(
3369 dut
, show_bgp_graceful_json_out
["remoteGrMode"]
3373 show_bgp_graceful_json_out
["remoteGrMode"] == "NotApplicable"
3374 and rmode
== "Disable"
3377 "[DUT: {}]: remoteGrMode : {} ".format(
3378 dut
, show_bgp_graceful_json_out
["remoteGrMode"]
3383 "[DUT: {}]: remoteGrMode is not correct"
3384 " Expected: {}, Found: {}".format(
3385 dut
, rmode
, show_bgp_graceful_json_out
["remoteGrMode"]
3390 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3394 @retry(retry_timeout
=10)
3395 def verify_r_bit(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3397 This API is to verify r_bit in the BGP gr capability advertised
3398 by the neighbor router
3402 * `tgen`: topogen object
3403 * `topo`: input json file data
3404 * `addr_type` : ip type ipv4/ipv6
3405 * `input_dict`: input dictionary, have details of Device Under Test, for
3406 which user wants to test the data
3407 * `dut`: input dut router name
3409 * `expected` : expected results from API, by-default True
3423 "graceful-restart": True
3436 "graceful-restart": True
3447 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
3451 errormsg(str) or True
3454 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3456 for router
, rnode
in tgen
.routers().items():
3460 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3462 if addr_type
in bgp_addr_type
:
3463 if not check_address_types(addr_type
):
3466 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3468 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3469 if bgp_neighbor
!= peer
:
3472 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3473 data
= topo
["routers"][bgp_neighbor
]["links"]
3475 if dest_link
in data
:
3476 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3479 "[DUT: {}]: Checking bgp graceful-restart show"
3480 " o/p {}".format(dut
, neighbor_ip
)
3483 show_bgp_graceful_json
= run_frr_cmd(
3485 "show bgp {} neighbor {} graceful-restart json".format(
3486 addr_type
, neighbor_ip
3491 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3493 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3495 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3498 errormsg
= "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3503 if "rBit" in show_bgp_graceful_json_out
:
3504 if show_bgp_graceful_json_out
["rBit"]:
3505 logger
.info("[DUT: {}]: Rbit true {}".format(dut
, neighbor_ip
))
3507 errormsg
= "[DUT: {}]: Rbit false {}".format(dut
, neighbor_ip
)
3510 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3514 @retry(retry_timeout
=10)
3515 def verify_eor(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3517 This API is to verify EOR
3521 * `tgen`: topogen object
3522 * `topo`: input json file data
3523 * `addr_type` : ip type ipv4/ipv6
3524 * `input_dict`: input dictionary, have details of DUT, for
3525 which user wants to test the data
3526 * `dut`: input dut router name
3541 "graceful-restart": True
3554 "graceful-restart": True
3566 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
3570 errormsg(str) or True
3572 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3574 for router
, rnode
in tgen
.routers().items():
3578 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3580 if addr_type
in bgp_addr_type
:
3581 if not check_address_types(addr_type
):
3584 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3586 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3587 if bgp_neighbor
!= peer
:
3590 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3591 data
= topo
["routers"][bgp_neighbor
]["links"]
3593 if dest_link
in data
:
3594 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3597 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
3602 show_bgp_graceful_json
= run_frr_cmd(
3604 "show bgp {} neighbor {} graceful-restart json".format(
3605 addr_type
, neighbor_ip
3610 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3612 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3613 logger
.info("[DUT: %s]: Neighbor ip matched %s", dut
, neighbor_ip
)
3615 errormsg
= "[DUT: %s]: Neighbor ip is NOT matched %s" % (
3621 if addr_type
== "ipv4":
3623 elif addr_type
== "ipv6":
3626 errormsg
= "Address type %s is not supported" % (addr_type
)
3629 eor_json
= show_bgp_graceful_json_out
[afi
]["endOfRibStatus"]
3630 if "endOfRibSend" in eor_json
:
3632 if eor_json
["endOfRibSend"]:
3634 "[DUT: %s]: EOR Send true for %s " "%s", dut
, neighbor_ip
, afi
3637 errormsg
= "[DUT: %s]: EOR Send false for %s" " %s" % (
3644 if "endOfRibRecv" in eor_json
:
3645 if eor_json
["endOfRibRecv"]:
3647 "[DUT: %s]: EOR Recv true %s " "%s", dut
, neighbor_ip
, afi
3650 errormsg
= "[DUT: %s]: EOR Recv false %s " "%s" % (
3657 if "endOfRibSentAfterUpdate" in eor_json
:
3658 if eor_json
["endOfRibSentAfterUpdate"]:
3660 "[DUT: %s]: EOR SendTime true for %s" " %s",
3666 errormsg
= "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3673 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3677 @retry(retry_timeout
=8)
3678 def verify_f_bit(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3680 This API is to verify f_bit in the BGP gr capability advertised
3681 by the neighbor router
3685 * `tgen`: topogen object
3686 * `topo`: input json file data
3687 * `addr_type` : ip type ipv4/ipv6
3688 * `input_dict`: input dictionary, have details of Device Under Test, for
3689 which user wants to test the data
3690 * `dut`: input dut router name
3692 * `expected` : expected results from API, by-default True
3706 "graceful-restart": True
3719 "graceful-restart": True
3731 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3735 errormsg(str) or True
3738 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3740 for router
, rnode
in tgen
.routers().items():
3744 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3746 if addr_type
in bgp_addr_type
:
3747 if not check_address_types(addr_type
):
3750 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3752 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3753 if bgp_neighbor
!= peer
:
3756 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3757 data
= topo
["routers"][bgp_neighbor
]["links"]
3759 if dest_link
in data
:
3760 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3763 "[DUT: {}]: Checking bgp graceful-restart show"
3764 " o/p {}".format(dut
, neighbor_ip
)
3767 show_bgp_graceful_json
= run_frr_cmd(
3769 "show bgp {} neighbor {} graceful-restart json".format(
3770 addr_type
, neighbor_ip
3775 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3777 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3779 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3782 errormsg
= "[DUT: {}]: Neighbor ip NOT a match {}".format(
3787 if "ipv4Unicast" in show_bgp_graceful_json_out
:
3788 if show_bgp_graceful_json_out
["ipv4Unicast"]["fBit"]:
3790 "[DUT: {}]: Fbit True for {} IPv4"
3791 " Unicast".format(dut
, neighbor_ip
)
3794 errormsg
= "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3799 elif "ipv6Unicast" in show_bgp_graceful_json_out
:
3800 if show_bgp_graceful_json_out
["ipv6Unicast"]["fBit"]:
3802 "[DUT: {}]: Fbit True for {} IPv6"
3803 " Unicast".format(dut
, neighbor_ip
)
3806 errormsg
= "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3811 show_bgp_graceful_json_out
["ipv4Unicast"]
3812 show_bgp_graceful_json_out
["ipv6Unicast"]
3814 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3818 @retry(retry_timeout
=10)
3819 def verify_graceful_restart_timers(tgen
, topo
, addr_type
, input_dict
, dut
, peer
):
3821 This API is to verify graceful restart timers, configured and received
3825 * `tgen`: topogen object
3826 * `topo`: input json file data
3827 * `addr_type` : ip type ipv4/ipv6
3828 * `input_dict`: input dictionary, have details of Device Under Test,
3829 for which user wants to test the data
3830 * `dut`: input dut router name
3832 * `expected` : expected results from API, by-default True
3836 # Configure graceful-restart
3842 "graceful-restart": "graceful-restart-helper"
3845 "gracefulrestart": ["restart-time 150"]
3852 "graceful-restart": "graceful-restart"
3859 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3863 errormsg(str) or True
3866 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3868 for router
, rnode
in tgen
.routers().items():
3872 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3874 if addr_type
in bgp_addr_type
:
3875 if not check_address_types(addr_type
):
3878 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3880 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3881 if bgp_neighbor
!= peer
:
3884 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3885 data
= topo
["routers"][bgp_neighbor
]["links"]
3887 if dest_link
in data
:
3888 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3891 "[DUT: {}]: Checking bgp graceful-restart show"
3892 " o/p {}".format(dut
, neighbor_ip
)
3895 show_bgp_graceful_json
= run_frr_cmd(
3897 "show bgp {} neighbor {} graceful-restart json".format(
3898 addr_type
, neighbor_ip
3903 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3904 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3906 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3909 errormsg
= "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3914 # Graceful-restart timer
3915 if "graceful-restart" in input_dict
[peer
]["bgp"]:
3916 if "timer" in input_dict
[peer
]["bgp"]["graceful-restart"]:
3917 for rs_timer
, value
in input_dict
[peer
]["bgp"]["graceful-restart"][
3920 if rs_timer
== "restart-time":
3922 receivedTimer
= value
3924 show_bgp_graceful_json_out
["timers"][
3925 "receivedRestartTimer"
3930 "receivedRestartTimer is {}"
3931 " on {} from peer {}".format(
3932 receivedTimer
, router
, peer
3937 "receivedRestartTimer is not"
3938 " as expected {}".format(receivedTimer
)
3942 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3946 @retry(retry_timeout
=8)
3947 def verify_gr_address_family(
3948 tgen
, topo
, addr_type
, addr_family
, dut
, peer
, expected
=True
3951 This API is to verify gr_address_family in the BGP gr capability advertised
3952 by the neighbor router
3956 * `tgen`: topogen object
3957 * `topo`: input json file data
3958 * `addr_type` : ip type ipv4/ipv6
3959 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3960 * `dut`: input dut router name
3961 * `peer`: input peer router to check
3962 * `expected` : expected results from API, by-default True
3967 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1", "r3")
3971 errormsg(str) or None
3974 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3976 if not check_address_types(addr_type
):
3977 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3980 routers
= tgen
.routers()
3981 if dut
not in routers
:
3982 return "{} not in routers".format(dut
)
3984 rnode
= routers
[dut
]
3985 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3987 if addr_type
not in bgp_addr_type
:
3988 return "{} not in bgp_addr_types".format(addr_type
)
3990 if peer
not in bgp_addr_type
[addr_type
]["unicast"]["neighbor"]:
3991 return "{} not a peer of {} over {}".format(peer
, dut
, addr_type
)
3993 nbr_links
= topo
["routers"][peer
]["links"]
3994 if dut
not in nbr_links
or addr_type
not in nbr_links
[dut
]:
3995 return "peer {} missing back link to {} over {}".format(peer
, dut
, addr_type
)
3997 neighbor_ip
= nbr_links
[dut
][addr_type
].split("/")[0]
4000 "[DUT: {}]: Checking bgp graceful-restart show o/p {} for {}".format(
4001 dut
, neighbor_ip
, addr_family
4005 show_bgp_graceful_json
= run_frr_cmd(
4007 "show bgp {} neighbor {} graceful-restart json".format(addr_type
, neighbor_ip
),
4011 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
4013 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
4014 logger
.info("Neighbor ip matched {}".format(neighbor_ip
))
4016 errormsg
= "Neighbor ip NOT a match {}".format(neighbor_ip
)
4019 if addr_family
== "ipv4Unicast":
4020 if "ipv4Unicast" in show_bgp_graceful_json_out
:
4021 logger
.info("ipv4Unicast present for {} ".format(neighbor_ip
))
4024 errormsg
= "ipv4Unicast NOT present for {} ".format(neighbor_ip
)
4027 elif addr_family
== "ipv6Unicast":
4028 if "ipv6Unicast" in show_bgp_graceful_json_out
:
4029 logger
.info("ipv6Unicast present for {} ".format(neighbor_ip
))
4032 errormsg
= "ipv6Unicast NOT present for {} ".format(neighbor_ip
)
4035 errormsg
= "Aaddress family: {} present for {} ".format(
4036 addr_family
, neighbor_ip
4040 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4043 @retry(retry_timeout
=12)
4044 def verify_attributes_for_evpn_routes(
4058 API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1"
4062 * `tgen`: topogen object
4063 * `topo` : json file data
4064 * `dut` : device under test
4065 * `input_dict`: having details like - for which route, rd value
4066 needs to be verified
4067 * `rd` : route distinguisher
4068 * `rt` : route target
4069 * `ethTag` : Ethernet Tag
4070 * `ipLen` : IP prefix length
4071 * `rd_peer` : Peer name from which RD will be auto-generated
4072 * `rt_peer` : Peer name from which RT will be auto-generated
4073 * `expected` : expected results from API, by-default True
4080 "network": [NETWORK1_1[addr_type]],
4081 "next_hop": NEXT_HOP_IP[addr_type],
4087 result = verify_attributes_for_evpn_routes(tgen, topo,
4088 input_dict, rd = "10.0.0.33:1")
4092 errormsg(str) or True
4095 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4096 for router
in input_dict
.keys():
4097 rnode
= tgen
.routers()[dut
]
4099 if "static_routes" in input_dict
[router
]:
4100 for static_route
in input_dict
[router
]["static_routes"]:
4101 network
= static_route
["network"]
4103 if "vrf" in static_route
:
4104 vrf
= static_route
["vrf"]
4106 if type(network
) is not list:
4109 for route
in network
:
4110 route
= route
.split("/")[0]
4111 _addr_type
= validate_ip_address(route
)
4112 if "v4" in _addr_type
:
4114 elif "v6" in _addr_type
:
4117 cmd
= "show bgp l2vpn evpn {} json".format(route
)
4118 evpn_rd_value_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4119 if not bool(evpn_rd_value_json
):
4120 errormsg
= "No output for '{}' cli".format(cmd
)
4123 if rd
is not None and rd
!= "auto":
4125 "[DUT: %s]: Verifying rd value for " "evpn route %s:",
4130 if rd
in evpn_rd_value_json
:
4131 rd_value_json
= evpn_rd_value_json
[rd
]
4132 if rd_value_json
["rd"] != rd
:
4134 "[DUT: %s] Failed: Verifying"
4135 " RD value for EVPN route: %s"
4136 "[FAILED]!!, EXPECTED : %s "
4138 % (dut
, route
, rd
, rd_value_json
["rd"])
4144 "[DUT %s]: Verifying RD value for"
4145 " EVPN route: %s [PASSED]|| "
4146 "Found Expected: %s",
4155 "[DUT: %s] RD : %s is not present"
4156 " in cli json output" % (dut
, rd
)
4162 "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:",
4171 rnode
= tgen
.routers()[rd_peer
]
4172 vrfs
= topo
["routers"][rd_peer
]["vrfs"]
4173 for vrf_dict
in vrfs
:
4174 vni_dict
[vrf_dict
["name"]] = index
4177 show_bgp_json
= run_frr_cmd(
4178 rnode
, "show bgp vrf all summary json", isjson
=True
4181 # Verifying output dictionary show_bgp_json is empty
4182 if not bool(show_bgp_json
):
4183 errormsg
= "BGP is not running"
4186 show_bgp_json_vrf
= show_bgp_json
[vrf
]
4187 for afi
, afi_data
in show_bgp_json_vrf
.items():
4188 if input_afi
not in afi
:
4190 router_id
= afi_data
["routerId"]
4193 rd
= "{}:{}".format(router_id
, vni_dict
[vrf
])
4194 for _rd
, rd_value_json
in evpn_rd_value_json
.items():
4196 str(rd_value_json
["rd"].split(":")[0])
4201 if int(rd_value_json
["rd"].split(":")[1]) > 0:
4206 "[DUT %s]: Verifying RD value for"
4208 "Found Expected: %s",
4211 rd_value_json
["rd"],
4216 "[DUT: %s] Failed: Verifying"
4217 " RD value for EVPN route: %s"
4218 " FOUND : %s" % (dut
, route
, rd_value_json
["rd"])
4225 "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
4231 rnode
= tgen
.routers()[rt_peer
]
4232 show_bgp_json
= run_frr_cmd(
4233 rnode
, "show bgp vrf all summary json", isjson
=True
4236 # Verifying output dictionary show_bgp_json is empty
4237 if not bool(show_bgp_json
):
4238 errormsg
= "BGP is not running"
4241 show_bgp_json_vrf
= show_bgp_json
[vrf
]
4242 for afi
, afi_data
in show_bgp_json_vrf
.items():
4243 if input_afi
not in afi
:
4245 as_num
= afi_data
["as"]
4247 show_vrf_vni_json
= run_frr_cmd(
4248 rnode
, "show vrf vni json", isjson
=True
4251 vrfs
= show_vrf_vni_json
["vrfs"]
4252 for vrf_dict
in vrfs
:
4253 if vrf_dict
["vrf"] == vrf
:
4254 vni_dict
[vrf_dict
["vrf"]] = str(vrf_dict
["vni"])
4256 # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI
4257 # for auto derived RT value.
4259 as_bin
= bin(as_num
)
4260 as_bin
= as_bin
[-16:]
4261 as_num
= int(as_bin
, 2)
4263 rt
= "{}:{}".format(str(as_num
), vni_dict
[vrf
])
4264 for _rd
, route_data
in evpn_rd_value_json
.items():
4265 if route_data
["ip"] == route
:
4266 for rt_data
in route_data
["paths"]:
4267 if vni_dict
[vrf
] == rt_data
["vni"]:
4268 rt_string
= rt_data
["extendedCommunity"][
4271 rt_input
= "RT:{}".format(rt
)
4272 if rt_input
not in rt_string
:
4281 % (dut
, route
, rt_input
, rt_string
)
4287 "[DUT %s]: Verifying "
4288 "RT value for EVPN "
4289 "route: %s [PASSED]||"
4290 "Found Expected: %s",
4299 "[DUT: %s] Route : %s is not"
4300 " present in cli json output" % (dut
, route
)
4304 if rt
is not None and rt
!= "auto":
4306 "[DUT: %s]: Verifying rt value for " "evpn route %s:",
4311 if type(rt
) is not list:
4315 for _rd
, route_data
in evpn_rd_value_json
.items():
4316 if route_data
["ip"] == route
:
4317 for rt_data
in route_data
["paths"]:
4318 rt_string
= rt_data
["extendedCommunity"][
4321 rt_input
= "RT:{}".format(_rt
)
4322 if rt_input
not in rt_string
:
4324 "[DUT: %s] Failed: "
4325 "Verifying RT value "
4326 "for EVPN route: %s"
4330 % (dut
, route
, rt_input
, rt_string
)
4336 "[DUT %s]: Verifying RT"
4337 " value for EVPN route:"
4339 "Found Expected: %s",
4348 "[DUT: %s] Route : %s is not"
4349 " present in cli json output" % (dut
, route
)
4353 if ethTag
is not None:
4355 "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut
4358 for _rd
, route_data
in evpn_rd_value_json
.items():
4359 if route_data
["ip"] == route
:
4360 if route_data
["ethTag"] != ethTag
:
4362 "[DUT: %s] RD: %s, Failed: "
4363 "Verifying ethTag value "
4364 "for EVPN route: %s"
4373 route_data
["ethTag"],
4380 "[DUT %s]: RD: %s, Verifying "
4381 "ethTag value for EVPN route:"
4383 "Found Expected: %s",
4393 "[DUT: %s] RD: %s, Route : %s "
4394 "is not present in cli json "
4395 "output" % (dut
, _rd
, route
)
4399 if ipLen
is not None:
4401 "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut
4404 for _rd
, route_data
in evpn_rd_value_json
.items():
4405 if route_data
["ip"] == route
:
4406 if route_data
["ipLen"] != int(ipLen
):
4408 "[DUT: %s] RD: %s, Failed: "
4409 "Verifying ipLen value "
4410 "for EVPN route: %s"
4414 % (dut
, _rd
, route
, ipLen
, route_data
["ipLen"])
4420 "[DUT %s]: RD: %s, Verifying "
4421 "ipLen value for EVPN route:"
4423 "Found Expected: %s",
4433 "[DUT: %s] RD: %s, Route : %s "
4434 "is not present in cli json "
4435 "output " % (dut
, _rd
, route
)
4439 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4443 @retry(retry_timeout
=10)
4444 def verify_evpn_routes(
4445 tgen
, topo
, dut
, input_dict
, routeType
=5, EthTag
=0, next_hop
=None, expected
=True
4448 API to verify evpn routes using "sh bgp l2vpn evpn"
4452 * `tgen`: topogen object
4453 * `topo` : json file data
4454 * `dut` : device under test
4455 * `input_dict`: having details like - for which route, rd value
4456 needs to be verified
4457 * `route_type` : Route type 5 is supported as of now
4458 * `EthTag` : Ethernet tag, by-default is 0
4459 * `next_hop` : Prefered nexthop for the evpn routes
4460 * `expected` : expected results from API, by-default True
4467 "network": [NETWORK1_1[addr_type]],
4468 "next_hop": NEXT_HOP_IP[addr_type],
4473 result = verify_evpn_routes(tgen, topo, input_dict)
4476 errormsg(str) or True
4479 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4481 for router
in input_dict
.keys():
4482 rnode
= tgen
.routers()[dut
]
4484 logger
.info("[DUT: %s]: Verifying evpn routes: ", dut
)
4486 if "static_routes" in input_dict
[router
]:
4487 for static_route
in input_dict
[router
]["static_routes"]:
4488 network
= static_route
["network"]
4490 if type(network
) is not list:
4494 for route
in network
:
4496 ip_len
= route
.split("/")[1]
4497 route
= route
.split("/")[0]
4499 prefix
= "[{}]:[{}]:[{}]:[{}]".format(
4500 routeType
, EthTag
, ip_len
, route
4503 cmd
= "show bgp l2vpn evpn route json"
4504 evpn_value_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4506 if not bool(evpn_value_json
):
4507 errormsg
= "No output for '{}' cli".format(cmd
)
4510 if evpn_value_json
["numPrefix"] == 0:
4511 errormsg
= "[DUT: %s]: No EVPN prefixes exist" % (dut
)
4514 for key
, route_data_json
in evpn_value_json
.items():
4515 if type(route_data_json
) is dict:
4517 if prefix
not in route_data_json
:
4518 missing_routes
[key
] = prefix
4520 if rd_keys
== len(missing_routes
.keys()):
4523 "Missing EVPN routes: "
4524 "%s [FAILED]!!" % (dut
, list(set(missing_routes
.values())))
4528 for key
, route_data_json
in evpn_value_json
.items():
4529 if type(route_data_json
) is dict:
4530 if prefix
not in route_data_json
:
4533 for paths
in route_data_json
[prefix
]["paths"]:
4535 if path
["routeType"] != routeType
:
4538 "Verifying routeType "
4539 "for EVPN route: %s "
4553 for nh_dict
in path
["nexthops"]:
4554 if nh_dict
["ip"] != next_hop
:
4574 "[DUT %s]: Verifying "
4585 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4590 @retry(retry_timeout
=10)
4591 def verify_bgp_bestpath(tgen
, addr_type
, input_dict
):
4593 Verifies bgp next hop values in best-path output
4595 * `dut` : device under test
4596 * `addr_type` : Address type ipv4/ipv6
4597 * `input_dict`: having details like multipath and bestpath
4604 "bestpath": "50.0.0.1",
4605 "multipath": ["50.0.0.1", "50.0.0.2"],
4606 "network": "100.0.0.0/24"
4609 "bestpath": "1000::1",
4610 "multipath": ["1000::1", "1000::2"]
4611 "network": "2000::1/128"
4616 result = verify_bgp_bestpath(tgen, input_dict)
4621 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4622 for dut
in input_dict
.keys():
4623 rnode
= tgen
.routers()[dut
]
4625 logger
.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut
)
4627 for network_dict
in input_dict
[dut
][addr_type
]:
4628 nw_addr
= network_dict
.setdefault("network", None)
4629 vrf
= network_dict
.setdefault("vrf", None)
4630 bestpath
= network_dict
.setdefault("bestpath", None)
4633 cmd
= "show bgp vrf {} {} {} bestpath json".format(
4634 vrf
, addr_type
, nw_addr
4637 cmd
= "show bgp {} {} bestpath json".format(addr_type
, nw_addr
)
4639 data
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4640 route
= data
["paths"][0]
4642 if "bestpath" in route
:
4643 if route
["bestpath"]["overall"] is True:
4644 _bestpath
= route
["nexthops"][0]["ip"]
4646 if _bestpath
!= bestpath
:
4648 "DUT:[{}] Bestpath do not match for"
4649 " network: {}, Expected "
4650 " {} as bgp bestpath found {}".format(
4651 dut
, nw_addr
, bestpath
, _bestpath
4656 "DUT:[{}] Found expected bestpath: "
4657 " {} for network: {}".format(dut
, _bestpath
, nw_addr
)
4661 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4665 def verify_tcp_mss(tgen
, dut
, neighbour
, configured_tcp_mss
, vrf
=None):
4667 This api is used to verify the tcp-mss value assigned to a neigbour of DUT
4671 * `tgen` : topogen object
4672 * `dut`: device under test
4673 * `neighbour`:neigbout IP address
4674 * `configured_tcp_mss`:The TCP-MSS value to be verified
4679 result = verify_tcp_mss(tgen, dut,neighbour,configured_tcp_mss)
4682 errormsg(str) or True
4685 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4686 rnode
= tgen
.routers()[dut
]
4688 cmd
= "show bgp vrf {} neighbors {} json".format(vrf
, neighbour
)
4690 cmd
= "show bgp neighbors {} json".format(neighbour
)
4692 # Execute the command
4693 show_vrf_stats
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4695 # Verify TCP-MSS on router
4696 logger
.info("Verify that no core is observed")
4697 if tgen
.routers_have_failure():
4698 errormsg
= "Core observed while running CLI: %s" % (cmd
)
4701 if configured_tcp_mss
== show_vrf_stats
.get(neighbour
).get(
4702 "bgpTcpMssConfigured"
4705 "Configured TCP - MSS Found: {}".format(sys
._getframe
().f_code
.co_name
)
4710 "TCP-MSS Mismatch ,configured {} expecting {}".format(
4711 show_vrf_stats
.get(neighbour
).get("bgpTcpMssConfigured"),
4715 return "TCP-MSS Mismatch"
4716 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4720 def get_dut_as_number(tgen
, dut
):
4722 API to get the Autonomous Number of the given DUT
4726 dut : Device Under test
4730 Success : DUT Autonomous number
4731 Fail : Error message with Boolean False
4733 tgen
= get_topogen()
4734 for router
, rnode
in tgen
.routers().items():
4736 show_bgp_json
= run_frr_cmd(rnode
, "sh ip bgp summary json ", isjson
=True)
4737 as_number
= show_bgp_json
["ipv4Unicast"]["as"]
4740 "[dut {}] DUT contains Automnomous number :: {} ".format(
4747 "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format(
4754 def get_prefix_count_route(
4755 tgen
, topo
, dut
, peer
, vrf
=None, link
=None, sent
=None, received
=None
4758 API to return the prefix count of default originate the given DUT
4759 dut : Device under test
4760 peer : neigbor on which you are expecting the route to be received
4763 prefix_count as dict with ipv4 and ipv6 value
4765 # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
4768 neighbor_ipv4_address
= topo
["routers"][peer
]["links"][link
]["ipv4"]
4769 neighbor_ipv6_address
= topo
["routers"][peer
]["links"][link
]["ipv6"]
4771 neighbor_ipv4_address
= topo
["routers"][peer
]["links"][dut
]["ipv4"]
4772 neighbor_ipv6_address
= topo
["routers"][peer
]["links"][dut
]["ipv6"]
4774 neighbor_ipv4_address
= neighbor_ipv4_address
.split("/")[0]
4775 neighbor_ipv6_address
= neighbor_ipv6_address
.split("/")[0]
4777 tgen
= get_topogen()
4778 for router
, rnode
in tgen
.routers().items():
4782 ipv4_cmd
= "sh ip bgp vrf {} summary json".format(vrf
)
4783 show_bgp_json_ipv4
= run_frr_cmd(rnode
, ipv4_cmd
, isjson
=True)
4784 ipv6_cmd
= "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf
)
4785 show_bgp_json_ipv6
= run_frr_cmd(rnode
, ipv6_cmd
, isjson
=True)
4787 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"]["peers"][
4788 neighbor_ipv4_address
4790 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4791 neighbor_ipv6_address
4795 "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4798 neighbor_ipv4_address
,
4799 neighbor_ipv6_address
,
4806 show_bgp_json_ipv4
= run_frr_cmd(
4807 rnode
, "sh ip bgp summary json ", isjson
=True
4809 show_bgp_json_ipv6
= run_frr_cmd(
4810 rnode
, "sh ip bgp ipv6 unicast summary json ", isjson
=True
4813 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4815 ][neighbor_ipv4_address
]["pfxRcd"]
4816 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4817 neighbor_ipv6_address
4821 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4823 ][neighbor_ipv4_address
]["pfxSnt"]
4824 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4825 neighbor_ipv6_address
4829 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4831 ][neighbor_ipv4_address
]["pfxRcd"]
4832 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4833 neighbor_ipv6_address
4837 "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4838 dut
, neighbor_ipv4_address
, neighbor_ipv6_address
, prefix_count
4843 logger
.error("ERROR...! Unknown dut {} in topolgy".format(dut
))
4846 @retry(retry_timeout
=5)
4847 def verify_rib_default_route(
4856 expected_aspath
=None,
4859 API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
4863 dut : device under test
4864 routes : default route with expected nexthop
4865 expected_nexthop : the nexthop that is expected the deafult route
4869 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4870 tgen
= get_topogen()
4871 connected_routes
= {}
4872 for router
, rnode
in tgen
.routers().items():
4875 ipv4_routes
= run_frr_cmd(rnode
, "sh ip bgp json", isjson
=True)
4876 ipv6_routes
= run_frr_cmd(rnode
, "sh ip bgp ipv6 unicast json", isjson
=True)
4877 is_ipv4_default_attrib_found
= False
4878 is_ipv6_default_attrib_found
= False
4880 default_ipv4_route
= routes
["ipv4"]
4881 default_ipv6_route
= "::/0"
4882 ipv4_route_Origin
= False
4883 ipv4_route_local_pref
= False
4884 ipv4_route_metric
= False
4886 if default_ipv4_route
in ipv4_routes
["routes"].keys():
4887 nxt_hop_count
= len(ipv4_routes
["routes"][default_ipv4_route
])
4889 for index
in range(nxt_hop_count
):
4890 rib_next_hops
.append(
4891 ipv4_routes
["routes"][default_ipv4_route
][index
]["nexthops"][0]["ip"]
4894 for nxt_hop
in expected_nexthop
.items():
4895 if nxt_hop
[0] == "ipv4":
4896 if nxt_hop
[1] in rib_next_hops
:
4898 "Default routes [{}] obtained from {} .....PASSED".format(
4899 default_ipv4_route
, nxt_hop
[1]
4904 "ERROR ...! Default routes [{}] expected is missing {}".format(
4905 default_ipv4_route
, nxt_hop
[1]
4913 if "origin" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4914 ipv4_route_Origin
= ipv4_routes
["routes"][default_ipv4_route
][0]["origin"]
4915 if "locPrf" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4916 ipv4_route_local_pref
= ipv4_routes
["routes"][default_ipv4_route
][0][
4919 if "metric" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4920 ipv4_route_metric
= ipv4_routes
["routes"][default_ipv4_route
][0]["metric"]
4922 logger
.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut
))
4925 origin_found
= False
4926 locPrf_found
= False
4927 metric_found
= False
4928 as_path_found
= False
4931 if origin
== ipv4_route_Origin
:
4933 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
4934 default_ipv4_route
, origin
4940 "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
4941 origin
, ipv4_route_Origin
4949 if locPrf
== ipv4_route_local_pref
:
4951 "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
4952 default_ipv4_route
, locPrf
4958 "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
4959 locPrf
, ipv4_route_local_pref
4967 if metric
== ipv4_route_metric
:
4969 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
4970 default_ipv4_route
, metric
4977 "ERROR... IPV4::! Expected metric is {} obtained {}".format(
4978 metric
, ipv4_route_metric
4986 obtained_aspath
= ipv4_routes
["routes"]["0.0.0.0/0"][0]["path"]
4987 if expected_aspath
in obtained_aspath
:
4988 as_path_found
= True
4990 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
4991 default_ipv4_route
, expected_aspath
4996 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
4997 expected_aspath
, obtained_aspath
5002 as_path_found
= True
5004 if origin_found
and locPrf_found
and metric_found
and as_path_found
:
5005 is_ipv4_default_attrib_found
= True
5007 "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5008 origin
, locPrf
, metric
, expected_aspath
5012 is_ipv4_default_attrib_found
= False
5014 "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
5015 origin
, ipv4_route_Origin
5019 "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
5020 locPrf
, ipv4_route_local_pref
5024 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5025 metric
, ipv4_route_metric
5029 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5030 expected_aspath
, obtained_aspath
5034 route_Origin
= False
5035 route_local_pref
= False
5036 route_local_metric
= False
5037 default_ipv6_route
= ""
5039 ipv6_routes
["routes"]["0::0/0"]
5040 default_ipv6_route
= "0::0/0"
5042 ipv6_routes
["routes"]["::/0"]
5043 default_ipv6_route
= "::/0"
5044 if default_ipv6_route
in ipv6_routes
["routes"].keys():
5045 nxt_hop_count
= len(ipv6_routes
["routes"][default_ipv6_route
])
5047 for index
in range(nxt_hop_count
):
5048 rib_next_hops
.append(
5049 ipv6_routes
["routes"][default_ipv6_route
][index
]["nexthops"][0]["ip"]
5052 rib_next_hops
.append(
5053 ipv6_routes
["routes"][default_ipv6_route
][index
]["nexthops"][1][
5057 except (KeyError, IndexError) as e
:
5058 logger
.error("NO impact ..! Global IPV6 Address not found ")
5060 for nxt_hop
in expected_nexthop
.items():
5061 if nxt_hop
[0] == "ipv6":
5062 if nxt_hop
[1] in rib_next_hops
:
5064 "Default routes [{}] obtained from {} .....PASSED".format(
5065 default_ipv6_route
, nxt_hop
[1]
5070 "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
5071 default_ipv6_route
, nxt_hop
[1], rib_next_hops
5078 if "origin" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5079 route_Origin
= ipv6_routes
["routes"][default_ipv6_route
][0]["origin"]
5080 if "locPrf" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5081 route_local_pref
= ipv6_routes
["routes"][default_ipv6_route
][0]["locPrf"]
5082 if "metric" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5083 route_local_metric
= ipv6_routes
["routes"][default_ipv6_route
][0]["metric"]
5085 origin_found
= False
5086 locPrf_found
= False
5087 metric_found
= False
5088 as_path_found
= False
5091 if origin
== route_Origin
:
5093 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
5094 default_ipv6_route
, route_Origin
5100 "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
5101 origin
, route_Origin
5109 if locPrf
== route_local_pref
:
5111 "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
5112 default_ipv6_route
, route_local_pref
5118 "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
5119 locPrf
, route_local_pref
5127 if metric
== route_local_metric
:
5129 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
5130 default_ipv4_route
, metric
5137 "ERROR... IPV6::! Expected metric is {} obtained {}".format(
5138 metric
, route_local_metric
5146 obtained_aspath
= ipv6_routes
["routes"]["::/0"][0]["path"]
5147 if expected_aspath
in obtained_aspath
:
5148 as_path_found
= True
5150 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
5151 default_ipv4_route
, expected_aspath
5156 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
5157 expected_aspath
, obtained_aspath
5162 as_path_found
= True
5164 if origin_found
and locPrf_found
and metric_found
and as_path_found
:
5165 is_ipv6_default_attrib_found
= True
5167 "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5168 origin
, locPrf
, metric
, expected_aspath
5172 is_ipv6_default_attrib_found
= False
5174 "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin
, route_Origin
)
5177 "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
5178 locPrf
, route_local_pref
5182 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5183 metric
, route_local_metric
5187 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5188 expected_aspath
, obtained_aspath
5192 if is_ipv4_default_attrib_found
and is_ipv6_default_attrib_found
:
5193 logger
.info("The attributes are found for default route in RIB ")
5199 @retry(retry_timeout
=5)
5200 def verify_fib_default_route(tgen
, topo
, dut
, routes
, expected_nexthop
):
5202 API to verify the the 'Default route" in FIB
5206 dut : device under test
5207 routes : default route with expected nexthop
5208 expected_nexthop : the nexthop that is expected the deafult route
5212 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5213 tgen
= get_topogen()
5214 connected_routes
= {}
5215 for router
, rnode
in tgen
.routers().items():
5217 ipv4_routes
= run_frr_cmd(rnode
, "sh ip route json", isjson
=True)
5218 ipv6_routes
= run_frr_cmd(rnode
, "sh ipv6 route json", isjson
=True)
5220 is_ipv4_default_route_found
= False
5221 is_ipv6_default_route_found
= False
5222 if routes
["ipv4"] in ipv4_routes
.keys():
5223 rib_ipv4_nxt_hops
= []
5224 ipv4_default_route
= routes
["ipv4"]
5225 nxt_hop_count
= len(ipv4_routes
[ipv4_default_route
][0]["nexthops"])
5226 for index
in range(nxt_hop_count
):
5227 rib_ipv4_nxt_hops
.append(
5228 ipv4_routes
[ipv4_default_route
][0]["nexthops"][index
]["ip"]
5231 if expected_nexthop
["ipv4"] in rib_ipv4_nxt_hops
:
5232 is_ipv4_default_route_found
= True
5234 "{} default route with next hop {} is found in FIB ".format(
5235 ipv4_default_route
, expected_nexthop
5240 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5241 ipv4_default_route
, expected_nexthop
5246 if routes
["ipv6"] in ipv6_routes
.keys() or "::/0" in ipv6_routes
.keys():
5247 rib_ipv6_nxt_hops
= []
5248 if "::/0" in ipv6_routes
.keys():
5249 ipv6_default_route
= "::/0"
5250 elif routes
["ipv6"] in ipv6_routes
.keys():
5251 ipv6_default_route
= routes
["ipv6"]
5253 nxt_hop_count
= len(ipv6_routes
[ipv6_default_route
][0]["nexthops"])
5254 for index
in range(nxt_hop_count
):
5255 rib_ipv6_nxt_hops
.append(
5256 ipv6_routes
[ipv6_default_route
][0]["nexthops"][index
]["ip"]
5259 if expected_nexthop
["ipv6"] in rib_ipv6_nxt_hops
:
5260 is_ipv6_default_route_found
= True
5262 "{} default route with next hop {} is found in FIB ".format(
5263 ipv6_default_route
, expected_nexthop
5268 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5269 ipv6_default_route
, expected_nexthop
5274 if is_ipv4_default_route_found
and is_ipv6_default_route_found
:
5278 "Default Route for ipv4 and ipv6 address family is not found in FIB "
5283 @retry(retry_timeout
=5)
5284 def verify_bgp_advertised_routes_from_neighbor(tgen
, topo
, dut
, peer
, expected_routes
):
5286 APi is verifies the the routes that are advertised from dut to peer
5289 "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
5290 "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
5292 dut : Device Under Tests
5293 Peer : Peer on which the routs is expected
5294 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5297 returns: True / False
5301 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5302 tgen
= get_topogen()
5304 peer_ipv4_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv4"].split("/")[0]
5305 peer_ipv6_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv6"].split("/")[0]
5307 for router
, rnode
in tgen
.routers().items():
5309 ipv4_receieved_routes
= run_frr_cmd(
5311 "sh ip bgp neighbor {} advertised-routes json".format(
5312 peer_ipv4_neighbor_ip
5316 ipv6_receieved_routes
= run_frr_cmd(
5318 "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format(
5319 peer_ipv6_neighbor_ip
5323 ipv4_route_count
= 0
5324 ipv6_route_count
= 0
5325 if ipv4_receieved_routes
:
5326 for index
in range(len(expected_routes
["ipv4"])):
5328 expected_routes
["ipv4"][index
]["network"]
5329 in ipv4_receieved_routes
["advertisedRoutes"].keys()
5331 ipv4_route_count
+= 1
5333 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5334 dut
, expected_routes
["ipv4"][index
]["network"], peer
5339 expected_routes
["ipv4"][index
]["network"]
5340 in ipv4_receieved_routes
["bgpOriginatingDefaultNetwork"]
5342 ipv4_route_count
+= 1
5344 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5345 dut
, expected_routes
["ipv4"][index
]["network"], peer
5351 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5352 dut
, expected_routes
["ipv4"][index
]["network"], peer
5356 logger
.error(ipv4_receieved_routes
)
5358 "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format(
5364 if ipv6_receieved_routes
:
5365 for index
in range(len(expected_routes
["ipv6"])):
5367 expected_routes
["ipv6"][index
]["network"]
5368 in ipv6_receieved_routes
["advertisedRoutes"].keys()
5370 ipv6_route_count
+= 1
5372 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5373 dut
, expected_routes
["ipv6"][index
]["network"], peer
5377 expected_routes
["ipv6"][index
]["network"]
5378 in ipv6_receieved_routes
["bgpOriginatingDefaultNetwork"]
5380 ipv6_route_count
+= 1
5382 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5383 dut
, expected_routes
["ipv6"][index
]["network"], peer
5388 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5389 dut
, expected_routes
["ipv6"][index
]["network"], peer
5393 logger
.error(ipv6_receieved_routes
)
5395 "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format(
5401 if ipv4_route_count
== len(expected_routes
["ipv4"]) and ipv6_route_count
== len(
5402 expected_routes
["ipv6"]
5407 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5408 expected_routes
["ipv4"], ipv4_receieved_routes
["advertisedRoutes"]
5412 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5413 expected_routes
["ipv6"], ipv6_receieved_routes
["advertisedRoutes"]
5419 @retry(retry_timeout
=5)
5420 def verify_bgp_received_routes_from_neighbor(tgen
, topo
, dut
, peer
, expected_routes
):
5422 API to verify the bgp received routes
5426 show ip bgp neighbor <x.x.x.x> received-routes
5427 show ip bgp ipv6 unicast neighbor <x::x> received-routes
5431 dut : Device Under Tests
5432 Peer : Peer on which the routs is expected
5433 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5441 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5442 tgen
= get_topogen()
5444 peer_ipv4_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv4"].split("/")[0]
5445 peer_ipv6_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv6"].split("/")[0]
5447 logger
.info("Enabling Soft configuration to neighbor INBOUND ")
5448 neigbor_dict
= {"ipv4": peer_ipv4_neighbor_ip
, "ipv6": peer_ipv6_neighbor_ip
}
5449 result
= configure_bgp_soft_configuration(
5450 tgen
, dut
, neigbor_dict
, direction
="inbound"
5454 ), " Failed to configure the soft configuration \n Error: {}".format(result
)
5456 """sleep of 10 sec is required to get the routes on peer after soft configuration"""
5458 for router
, rnode
in tgen
.routers().items():
5460 ipv4_receieved_routes
= run_frr_cmd(
5462 "sh ip bgp neighbor {} received-routes json".format(
5463 peer_ipv4_neighbor_ip
5467 ipv6_receieved_routes
= run_frr_cmd(
5469 "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
5470 peer_ipv6_neighbor_ip
5474 ipv4_route_count
= 0
5475 ipv6_route_count
= 0
5476 if ipv4_receieved_routes
:
5477 for index
in range(len(expected_routes
["ipv4"])):
5479 expected_routes
["ipv4"][index
]["network"]
5480 in ipv4_receieved_routes
["receivedRoutes"].keys()
5482 ipv4_route_count
+= 1
5484 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5485 dut
, expected_routes
["ipv4"][index
]["network"], peer
5490 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5491 dut
, expected_routes
["ipv4"][index
]["network"], peer
5495 logger
.error(ipv4_receieved_routes
)
5497 "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
5503 if ipv6_receieved_routes
:
5504 for index
in range(len(expected_routes
["ipv6"])):
5506 expected_routes
["ipv6"][index
]["network"]
5507 in ipv6_receieved_routes
["receivedRoutes"].keys()
5509 ipv6_route_count
+= 1
5511 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5512 dut
, expected_routes
["ipv6"][index
]["network"], peer
5517 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5518 dut
, expected_routes
["ipv6"][index
]["network"], peer
5522 logger
.error(ipv6_receieved_routes
)
5524 "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
5530 if ipv4_route_count
== len(expected_routes
["ipv4"]) and ipv6_route_count
== len(
5531 expected_routes
["ipv6"]
5536 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5537 expected_routes
["ipv4"], ipv4_receieved_routes
["advertisedRoutes"]
5541 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5542 expected_routes
["ipv6"], ipv6_receieved_routes
["advertisedRoutes"]
5548 def configure_bgp_soft_configuration(tgen
, dut
, neighbor_dict
, direction
):
5550 Api to configure the bgp soft configuration to show the received routes from peer
5553 dut : device under test route on which the sonfiguration to be applied
5554 neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
5555 direction : Directionon which it should be applied in/out
5561 logger
.info("Enabling Soft configuration to neighbor INBOUND ")
5562 local_as
= get_dut_as_number(tgen
, dut
)
5563 ipv4_neighbor
= neighbor_dict
["ipv4"]
5564 ipv6_neighbor
= neighbor_dict
["ipv6"]
5565 direction
= direction
.lower()
5566 if ipv4_neighbor
and ipv4_neighbor
:
5570 "router bgp {}".format(local_as
),
5571 "address-family ipv4 unicast",
5572 "neighbor {} soft-reconfiguration {} ".format(
5573 ipv4_neighbor
, direction
5575 "exit-address-family",
5576 "address-family ipv6 unicast",
5577 "neighbor {} soft-reconfiguration {} ".format(
5578 ipv6_neighbor
, direction
5580 "exit-address-family",
5584 result
= apply_raw_config(tgen
, raw_config
)
5586 "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(