2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
24 from copy
import deepcopy
25 from time
import sleep
27 # Import common_config to use commomnly used APIs
28 from lib
.common_config
import (
29 create_common_configurations
,
34 find_interface_with_greater_ip
,
36 get_frr_ipv6_linklocal
,
41 from lib
.topogen
import get_topogen
42 from lib
.topolog
import logger
43 from lib
.topotest
import frr_unicode
45 from lib
import topotest
48 def create_router_bgp(tgen
, topo
=None, input_dict
=None, build
=False, load_config
=True):
50 API to configure bgp on router
54 * `tgen` : Topogen object
55 * `topo` : json file data
56 * `input_dict` : Input dict data, required when configuring from testcase
57 * `build` : Only for initial setup phase this is set as True.
65 "router_id": "22.22.22.22",
67 "graceful-restart": True,
68 "preserve-fw-state": True,
72 "select-defer-time": 30,
85 "redist_type": "static",
90 {"redist_type": "connected"}
92 "advertise_networks": [
94 "network": "20.0.0.0/32",
98 "network": "30.0.0.0/32",
104 "keepalivetimer": 60,
105 "holddowntimer": 180,
109 "number_occurences": 2
118 "name": "RMAP_MED_R3",
121 "next_hop_self": True
123 "r1": {"graceful-restart-helper": True}
139 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
143 topo
= tgen
.json_topo
145 # Flag is used when testing ipv6 over ipv4 or vice-versa
149 input_dict
= deepcopy(topo
)
151 topo
= topo
["routers"]
152 input_dict
= deepcopy(input_dict
)
154 config_data_dict
= {}
156 for router
in input_dict
.keys():
157 if "bgp" not in input_dict
[router
]:
158 logger
.debug("Router %s: 'bgp' not present in input_dict", router
)
161 bgp_data_list
= input_dict
[router
]["bgp"]
163 if type(bgp_data_list
) is not list:
164 bgp_data_list
= [bgp_data_list
]
168 for bgp_data
in bgp_data_list
:
169 data_all_bgp
= __create_bgp_global(tgen
, bgp_data
, router
, build
)
171 bgp_addr_data
= bgp_data
.setdefault("address_family", {})
173 if not bgp_addr_data
:
175 "Router %s: 'address_family' not present in "
176 "input_dict for BGP",
181 ipv4_data
= bgp_addr_data
.setdefault("ipv4", {})
182 ipv6_data
= bgp_addr_data
.setdefault("ipv6", {})
183 l2vpn_data
= bgp_addr_data
.setdefault("l2vpn", {})
187 if ipv4_data
.setdefault("unicast", {})
188 or ipv6_data
.setdefault("unicast", {})
192 l2vpn_evpn
= True if l2vpn_data
.setdefault("evpn", {}) else False
195 data_all_bgp
= __create_bgp_unicast_neighbor(
201 config_data
=data_all_bgp
,
205 data_all_bgp
= __create_l2vpn_evpn_address_family(
206 tgen
, topo
, bgp_data
, router
, config_data
=data_all_bgp
209 config_data
.extend(data_all_bgp
)
212 config_data_dict
[router
] = config_data
215 result
= create_common_configurations(
216 tgen
, config_data_dict
, "bgp", build
, load_config
218 except InvalidCLIError
:
219 logger
.error("create_router_bgp", exc_info
=True)
222 logger
.debug("Exiting lib API: create_router_bgp()")
226 def __create_bgp_global(tgen
, input_dict
, router
, build
=False):
228 Helper API to create bgp global configuration.
232 * `tgen` : Topogen object
233 * `input_dict` : Input dict data, required when configuring from testcase
234 * `router` : router id to be configured.
235 * `build` : Only for initial setup phase this is set as True.
239 list of config commands
243 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
245 bgp_data
= input_dict
246 del_bgp_action
= bgp_data
.setdefault("delete", False)
250 if "local_as" not in bgp_data
and build
:
252 "Router %s: 'local_as' not present in input_dict" "for BGP", router
256 local_as
= bgp_data
.setdefault("local_as", "")
257 cmd
= "router bgp {}".format(local_as
)
258 vrf_id
= bgp_data
.setdefault("vrf", None)
260 cmd
= "{} vrf {}".format(cmd
, vrf_id
)
263 cmd
= "no {}".format(cmd
)
264 config_data
.append(cmd
)
268 config_data
.append(cmd
)
269 config_data
.append("no bgp ebgp-requires-policy")
271 router_id
= bgp_data
.setdefault("router_id", None)
272 del_router_id
= bgp_data
.setdefault("del_router_id", False)
274 config_data
.append("no bgp router-id")
276 config_data
.append("bgp router-id {}".format(router_id
))
278 config_data
.append("bgp log-neighbor-changes")
279 config_data
.append("no bgp network import-check")
280 bgp_peer_grp_data
= bgp_data
.setdefault("peer-group", {})
282 if "peer-group" in bgp_data
and bgp_peer_grp_data
:
283 peer_grp_data
= __create_bgp_peer_group(tgen
, bgp_peer_grp_data
, router
)
284 config_data
.extend(peer_grp_data
)
286 bst_path
= bgp_data
.setdefault("bestpath", None)
288 if "aspath" in bst_path
:
289 if "delete" in bst_path
:
291 "no bgp bestpath as-path {}".format(bst_path
["aspath"])
294 config_data
.append("bgp bestpath as-path {}".format(bst_path
["aspath"]))
296 if "graceful-restart" in bgp_data
:
297 graceful_config
= bgp_data
["graceful-restart"]
299 graceful_restart
= graceful_config
.setdefault("graceful-restart", None)
301 graceful_restart_disable
= graceful_config
.setdefault(
302 "graceful-restart-disable", None
305 preserve_fw_state
= graceful_config
.setdefault("preserve-fw-state", None)
307 disable_eor
= graceful_config
.setdefault("disable-eor", None)
309 if graceful_restart
== False:
310 cmd
= "no bgp graceful-restart"
312 cmd
= "bgp graceful-restart"
314 if graceful_restart
is not None:
315 config_data
.append(cmd
)
317 if graceful_restart_disable
== False:
318 cmd
= "no bgp graceful-restart-disable"
319 if graceful_restart_disable
:
320 cmd
= "bgp graceful-restart-disable"
322 if graceful_restart_disable
is not None:
323 config_data
.append(cmd
)
325 if preserve_fw_state
== False:
326 cmd
= "no bgp graceful-restart preserve-fw-state"
327 if preserve_fw_state
:
328 cmd
= "bgp graceful-restart preserve-fw-state"
330 if preserve_fw_state
is not None:
331 config_data
.append(cmd
)
333 if disable_eor
== False:
334 cmd
= "no bgp graceful-restart disable-eor"
336 cmd
= "bgp graceful-restart disable-eor"
338 if disable_eor
is not None:
339 config_data
.append(cmd
)
341 if "timer" in bgp_data
["graceful-restart"]:
342 timer
= bgp_data
["graceful-restart"]["timer"]
344 if "delete" in timer
:
345 del_action
= timer
["delete"]
349 for rs_timer
, value
in timer
.items():
350 rs_timer_value
= timer
.setdefault(rs_timer
, None)
352 if rs_timer_value
and rs_timer
!= "delete":
353 cmd
= "bgp graceful-restart {} {}".format(rs_timer
, rs_timer_value
)
356 cmd
= "no {}".format(cmd
)
358 config_data
.append(cmd
)
360 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
364 def __create_bgp_unicast_neighbor(
365 tgen
, topo
, input_dict
, router
, afi_test
, config_data
=None
368 Helper API to create configuration for address-family unicast
372 * `tgen` : Topogen object
373 * `topo` : json file data
374 * `input_dict` : Input dict data, required when configuring from testcase
375 * `router` : router id to be configured.
376 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
377 * `build` : Only for initial setup phase this is set as True.
381 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
384 bgp_data
= input_dict
385 if "router bgp" in config_data
:
388 bgp_data
= input_dict
["address_family"]
390 for addr_type
, addr_dict
in bgp_data
.items():
394 if not check_address_types(addr_type
) and not afi_test
:
397 addr_data
= addr_dict
["unicast"]
399 config_data
.append("address-family {} unicast".format(addr_type
))
401 advertise_network
= addr_data
.setdefault("advertise_networks", [])
402 for advertise_network_dict
in advertise_network
:
403 network
= advertise_network_dict
["network"]
404 if type(network
) is not list:
407 if "no_of_network" in advertise_network_dict
:
408 no_of_network
= advertise_network_dict
["no_of_network"]
412 del_action
= advertise_network_dict
.setdefault("delete", False)
414 # Generating IPs for verification
415 network_list
= generate_ips(network
, no_of_network
)
416 for ip
in network_list
:
417 ip
= str(ipaddress
.ip_network(frr_unicode(ip
)))
419 cmd
= "network {}".format(ip
)
421 cmd
= "no {}".format(cmd
)
423 config_data
.append(cmd
)
425 import_cmd
= addr_data
.setdefault("import", {})
428 if import_cmd
["delete"]:
429 config_data
.append("no import vrf {}".format(import_cmd
["vrf"]))
431 config_data
.append("import vrf {}".format(import_cmd
["vrf"]))
433 max_paths
= addr_data
.setdefault("maximum_paths", {})
435 ibgp
= max_paths
.setdefault("ibgp", None)
436 ebgp
= max_paths
.setdefault("ebgp", None)
437 del_cmd
= max_paths
.setdefault("delete", False)
440 config_data
.append("no maximum-paths ibgp {}".format(ibgp
))
442 config_data
.append("maximum-paths ibgp {}".format(ibgp
))
445 config_data
.append("no maximum-paths {}".format(ebgp
))
447 config_data
.append("maximum-paths {}".format(ebgp
))
449 aggregate_addresses
= addr_data
.setdefault("aggregate_address", [])
450 for aggregate_address
in aggregate_addresses
:
451 network
= aggregate_address
.setdefault("network", None)
454 "Router %s: 'network' not present in " "input_dict for BGP", router
457 cmd
= "aggregate-address {}".format(network
)
459 as_set
= aggregate_address
.setdefault("as_set", False)
460 summary
= aggregate_address
.setdefault("summary", False)
461 del_action
= aggregate_address
.setdefault("delete", False)
463 cmd
= "{} as-set".format(cmd
)
465 cmd
= "{} summary".format(cmd
)
468 cmd
= "no {}".format(cmd
)
470 config_data
.append(cmd
)
472 redistribute_data
= addr_data
.setdefault("redistribute", {})
473 if redistribute_data
:
474 for redistribute
in redistribute_data
:
475 if "redist_type" not in redistribute
:
477 "Router %s: 'redist_type' not present in " "input_dict", router
480 cmd
= "redistribute {}".format(redistribute
["redist_type"])
481 redist_attr
= redistribute
.setdefault("attribute", None)
483 if type(redist_attr
) is dict:
484 for key
, value
in redist_attr
.items():
485 cmd
= "{} {} {}".format(cmd
, key
, value
)
487 cmd
= "{} {}".format(cmd
, redist_attr
)
489 del_action
= redistribute
.setdefault("delete", False)
491 cmd
= "no {}".format(cmd
)
492 config_data
.append(cmd
)
494 admin_dist_data
= addr_data
.setdefault("distance", {})
496 if len(admin_dist_data
) < 2:
498 "Router %s: pass the admin distance values for "
499 "ebgp, ibgp and local routes",
502 cmd
= "distance bgp {} {} {}".format(
503 admin_dist_data
["ebgp"],
504 admin_dist_data
["ibgp"],
505 admin_dist_data
["local"],
508 del_action
= admin_dist_data
.setdefault("delete", False)
510 cmd
= "no distance bgp"
511 config_data
.append(cmd
)
513 import_vrf_data
= addr_data
.setdefault("import", {})
515 cmd
= "import vrf {}".format(import_vrf_data
["vrf"])
517 del_action
= import_vrf_data
.setdefault("delete", False)
519 cmd
= "no {}".format(cmd
)
520 config_data
.append(cmd
)
522 if "neighbor" in addr_data
:
523 neigh_data
= __create_bgp_neighbor(
524 topo
, input_dict
, router
, addr_type
, add_neigh
526 config_data
.extend(neigh_data
)
527 # configure default originate
528 if "default_originate" in addr_data
:
529 default_originate_config
= __create_bgp_default_originate_neighbor(
530 topo
, input_dict
, router
, addr_type
, add_neigh
532 config_data
.extend(default_originate_config
)
534 for addr_type
, addr_dict
in bgp_data
.items():
535 if not addr_dict
or not check_address_types(addr_type
):
538 addr_data
= addr_dict
["unicast"]
539 if "neighbor" in addr_data
:
540 neigh_addr_data
= __create_bgp_unicast_address_family(
541 topo
, input_dict
, router
, addr_type
, add_neigh
544 config_data
.extend(neigh_addr_data
)
546 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
550 def __create_bgp_default_originate_neighbor(
551 topo
, input_dict
, router
, addr_type
, add_neigh
=True
554 Helper API to create neighbor default - originate configuration
558 * `tgen` : Topogen object
559 * `topo` : json file data
560 * `input_dict` : Input dict data, required when configuring from testcase
561 * `router` : router id to be configured
565 logger
.debug("Entering lib API: __create_bgp_default_originate_neighbor()")
567 bgp_data
= input_dict
["address_family"]
568 neigh_data
= bgp_data
[addr_type
]["unicast"]["default_originate"]
569 for name
, peer_dict
in neigh_data
.items():
570 nh_details
= topo
[name
]
573 if "dest-link" in neigh_data
[name
]:
574 dest_link
= neigh_data
[name
]["dest-link"]
575 neighbor_ip
= nh_details
["links"][dest_link
][addr_type
].split("/")[0]
576 elif "add_type" in neigh_data
[name
]:
577 add_type
= neigh_data
[name
]["add_type"]
578 neighbor_ip
= nh_details
["links"][add_type
][addr_type
].split("/")[0]
580 neighbor_ip
= nh_details
["links"][router
][addr_type
].split("/")[0]
582 config_data
.append("address-family {} unicast".format(addr_type
))
583 if "route_map" in peer_dict
:
584 route_map
= peer_dict
["route_map"]
585 if "delete" in peer_dict
:
586 if peer_dict
["delete"]:
588 "no neighbor {} default-originate route-map {}".format(
589 neighbor_ip
, route_map
594 " neighbor {} default-originate route-map {}".format(
595 neighbor_ip
, route_map
600 " neighbor {} default-originate route-map {}".format(
601 neighbor_ip
, route_map
606 if "delete" in peer_dict
:
607 if peer_dict
["delete"]:
609 "no neighbor {} default-originate".format(neighbor_ip
)
613 "neighbor {} default-originate".format(neighbor_ip
)
616 config_data
.append("neighbor {} default-originate".format(neighbor_ip
))
618 logger
.debug("Exiting lib API: __create_bgp_default_originate_neighbor()")
622 def __create_l2vpn_evpn_address_family(
623 tgen
, topo
, input_dict
, router
, config_data
=None
626 Helper API to create configuration for l2vpn evpn address-family
630 * `tgen` : Topogen object
631 * `topo` : json file data
632 * `input_dict` : Input dict data, required when configuring
634 * `router` : router id to be configured.
635 * `build` : Only for initial setup phase this is set as True.
640 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
642 bgp_data
= input_dict
["address_family"]
644 for family_type
, family_dict
in bgp_data
.items():
645 if family_type
!= "l2vpn":
648 family_data
= family_dict
["evpn"]
650 config_data
.append("address-family l2vpn evpn")
652 advertise_data
= family_data
.setdefault("advertise", {})
653 neighbor_data
= family_data
.setdefault("neighbor", {})
654 advertise_all_vni_data
= family_data
.setdefault("advertise-all-vni", None)
655 rd_data
= family_data
.setdefault("rd", None)
656 no_rd_data
= family_data
.setdefault("no rd", False)
657 route_target_data
= family_data
.setdefault("route-target", {})
660 for address_type
, unicast_type
in advertise_data
.items():
662 if type(unicast_type
) is dict:
663 for key
, value
in unicast_type
.items():
664 cmd
= "advertise {} {}".format(address_type
, key
)
667 route_map
= value
.setdefault("route-map", {})
668 advertise_del_action
= value
.setdefault("delete", None)
671 cmd
= "{} route-map {}".format(cmd
, route_map
)
673 if advertise_del_action
:
674 cmd
= "no {}".format(cmd
)
676 config_data
.append(cmd
)
679 for neighbor
, neighbor_data
in neighbor_data
.items():
680 ipv4_neighbor
= neighbor_data
.setdefault("ipv4", {})
681 ipv6_neighbor
= neighbor_data
.setdefault("ipv6", {})
684 for neighbor_name
, action
in ipv4_neighbor
.items():
685 neighbor_ip
= topo
[neighbor
]["links"][neighbor_name
][
689 if type(action
) is dict:
690 next_hop_self
= action
.setdefault("next_hop_self", None)
691 route_maps
= action
.setdefault("route_maps", {})
693 if next_hop_self
is not None:
694 if next_hop_self
is True:
697 "next-hop-self".format(neighbor_ip
)
699 elif next_hop_self
is False:
702 "next-hop-self".format(neighbor_ip
)
706 for route_map
in route_maps
:
707 name
= route_map
.setdefault("name", {})
708 direction
= route_map
.setdefault("direction", "in")
709 del_action
= route_map
.setdefault("delete", False)
715 "input_dict for BGP "
716 "neighbor route name",
720 cmd
= "neighbor {} route-map {} " "{}".format(
721 neighbor_ip
, name
, direction
725 cmd
= "no {}".format(cmd
)
727 config_data
.append(cmd
)
730 if action
== "activate":
731 cmd
= "neighbor {} activate".format(neighbor_ip
)
732 elif action
== "deactivate":
733 cmd
= "no neighbor {} activate".format(neighbor_ip
)
735 config_data
.append(cmd
)
738 for neighbor_name
, action
in ipv4_neighbor
.items():
739 neighbor_ip
= topo
[neighbor
]["links"][neighbor_name
][
742 if action
== "activate":
743 cmd
= "neighbor {} activate".format(neighbor_ip
)
744 elif action
== "deactivate":
745 cmd
= "no neighbor {} activate".format(neighbor_ip
)
747 config_data
.append(cmd
)
749 if advertise_all_vni_data
== True:
750 cmd
= "advertise-all-vni"
751 config_data
.append(cmd
)
752 elif advertise_all_vni_data
== False:
753 cmd
= "no advertise-all-vni"
754 config_data
.append(cmd
)
757 cmd
= "rd {}".format(rd_data
)
758 config_data
.append(cmd
)
761 cmd
= "no rd {}".format(no_rd_data
)
762 config_data
.append(cmd
)
764 if route_target_data
:
765 for rt_type
, rt_dict
in route_target_data
.items():
766 for _rt_dict
in rt_dict
:
767 rt_value
= _rt_dict
.setdefault("value", None)
768 del_rt
= _rt_dict
.setdefault("delete", None)
771 cmd
= "route-target {} {}".format(rt_type
, rt_value
)
773 cmd
= "no {}".format(cmd
)
775 config_data
.append(cmd
)
777 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
782 def __create_bgp_peer_group(topo
, input_dict
, router
):
784 Helper API to create neighbor specific configuration
788 * `topo` : json file data
789 * `input_dict` : Input dict data, required when configuring from testcase
790 * `router` : router id to be configured
793 logger
.debug("Entering lib API: __create_bgp_peer_group()")
795 for grp
, grp_dict
in input_dict
.items():
796 config_data
.append("neighbor {} peer-group".format(grp
))
797 neigh_cxt
= "neighbor {} ".format(grp
)
798 update_source
= grp_dict
.setdefault("update-source", None)
799 remote_as
= grp_dict
.setdefault("remote-as", None)
800 capability
= grp_dict
.setdefault("capability", None)
802 config_data
.append("{} update-source {}".format(neigh_cxt
, update_source
))
805 config_data
.append("{} remote-as {}".format(neigh_cxt
, remote_as
))
808 config_data
.append("{} capability {}".format(neigh_cxt
, capability
))
810 logger
.debug("Exiting lib API: __create_bgp_peer_group()")
814 def __create_bgp_neighbor(topo
, input_dict
, router
, addr_type
, add_neigh
=True):
816 Helper API to create neighbor specific configuration
820 * `tgen` : Topogen object
821 * `topo` : json file data
822 * `input_dict` : Input dict data, required when configuring from testcase
823 * `router` : router id to be configured
826 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
828 bgp_data
= input_dict
["address_family"]
829 neigh_data
= bgp_data
[addr_type
]["unicast"]["neighbor"]
830 global_connect
= input_dict
.get("connecttimer", 5)
832 for name
, peer_dict
in neigh_data
.items():
834 for dest_link
, peer
in peer_dict
["dest_link"].items():
835 local_asn
= peer
.setdefault("local_asn", {})
837 local_as
= local_asn
.setdefault("local_as", 0)
838 remote_as
= local_asn
.setdefault("remote_as", 0)
839 no_prepend
= local_asn
.setdefault("no_prepend", False)
840 replace_as
= local_asn
.setdefault("replace_as", False)
841 if local_as
== remote_as
:
842 assert False is True, (
843 " Configuration Error : Router must not have "
844 "same AS-NUMBER as Local AS NUMBER"
846 nh_details
= topo
[name
]
848 if "vrfs" in topo
[router
] or type(nh_details
["bgp"]) is list:
849 for vrf_data
in nh_details
["bgp"]:
850 if "vrf" in nh_details
["links"][dest_link
] and "vrf" in vrf_data
:
851 if nh_details
["links"][dest_link
]["vrf"] == vrf_data
["vrf"]:
853 remote_as
= vrf_data
["local_as"]
856 if "vrf" not in vrf_data
:
858 remote_as
= vrf_data
["local_as"]
862 remote_as
= nh_details
["bgp"]["local_as"]
866 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
867 ip_addr
= nh_details
["links"][dest_link
]["peer-interface"]
868 elif "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
869 intf
= topo
[name
]["links"][dest_link
]["interface"]
870 ip_addr
= get_frr_ipv6_linklocal(tgen
, name
, intf
)
871 elif dest_link
in nh_details
["links"].keys():
873 ip_addr
= nh_details
["links"][dest_link
][addr_type
].split("/")[0]
875 intf
= topo
[name
]["links"][dest_link
]["interface"]
876 ip_addr
= get_frr_ipv6_linklocal(tgen
, name
, intf
)
877 if "delete" in peer
and peer
["delete"]:
878 neigh_cxt
= "no neighbor {}".format(ip_addr
)
879 config_data
.append("{}".format(neigh_cxt
))
882 neigh_cxt
= "neighbor {}".format(ip_addr
)
884 if "peer-group" in peer
:
886 "neighbor {} interface peer-group {}".format(
887 ip_addr
, peer
["peer-group"]
892 if "source_link" in peer
:
893 if peer
["source_link"] == "lo":
894 update_source
= topo
[router
]["links"]["lo"][addr_type
].split("/")[0]
896 update_source
= topo
[router
]["links"][peer
["source_link"]][
899 if "peer-group" not in peer
:
900 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
902 "{} interface remote-as {}".format(neigh_cxt
, remote_as
)
905 config_data
.append("{} remote-as {}".format(neigh_cxt
, remote_as
))
907 if local_asn
and local_as
:
908 cmd
= "{} local-as {}".format(neigh_cxt
, local_as
)
910 cmd
= "{} no-prepend".format(cmd
)
912 cmd
= "{} replace-as".format(cmd
)
913 config_data
.append("{}".format(cmd
))
915 if addr_type
== "ipv6":
916 config_data
.append("address-family ipv6 unicast")
917 config_data
.append("{} activate".format(neigh_cxt
))
919 if "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
921 "{} update-source {}".format(
922 neigh_cxt
, nh_details
["links"][dest_link
]["peer-interface"]
926 "{} interface {}".format(
927 neigh_cxt
, nh_details
["links"][dest_link
]["peer-interface"]
931 disable_connected
= peer
.setdefault("disable_connected_check", False)
932 connect
= peer
.get("connecttimer", global_connect
)
933 keep_alive
= peer
.setdefault("keepalivetimer", 3)
934 hold_down
= peer
.setdefault("holddowntimer", 10)
935 password
= peer
.setdefault("password", None)
936 no_password
= peer
.setdefault("no_password", None)
937 capability
= peer
.setdefault("capability", None)
938 max_hop_limit
= peer
.setdefault("ebgp_multihop", 1)
940 graceful_restart
= peer
.setdefault("graceful-restart", None)
941 graceful_restart_helper
= peer
.setdefault("graceful-restart-helper", None)
942 graceful_restart_disable
= peer
.setdefault("graceful-restart-disable", None)
944 config_data
.append("{} capability {}".format(neigh_cxt
, capability
))
948 "{} update-source {}".format(neigh_cxt
, update_source
)
950 if disable_connected
:
952 "{} disable-connected-check".format(disable_connected
)
956 "{} update-source {}".format(neigh_cxt
, update_source
)
958 if int(keep_alive
) != 60 and int(hold_down
) != 180:
960 "{} timers {} {}".format(neigh_cxt
, keep_alive
, hold_down
)
962 if int(connect
) != 120:
963 config_data
.append("{} timers connect {}".format(neigh_cxt
, connect
))
966 config_data
.append("{} graceful-restart".format(neigh_cxt
))
967 elif graceful_restart
== False:
968 config_data
.append("no {} graceful-restart".format(neigh_cxt
))
970 if graceful_restart_helper
:
971 config_data
.append("{} graceful-restart-helper".format(neigh_cxt
))
972 elif graceful_restart_helper
== False:
973 config_data
.append("no {} graceful-restart-helper".format(neigh_cxt
))
975 if graceful_restart_disable
:
976 config_data
.append("{} graceful-restart-disable".format(neigh_cxt
))
977 elif graceful_restart_disable
== False:
978 config_data
.append("no {} graceful-restart-disable".format(neigh_cxt
))
981 config_data
.append("{} password {}".format(neigh_cxt
, password
))
984 config_data
.append("no {} password {}".format(neigh_cxt
, no_password
))
986 if max_hop_limit
> 1:
988 "{} ebgp-multihop {}".format(neigh_cxt
, max_hop_limit
)
990 config_data
.append("{} enforce-multihop".format(neigh_cxt
))
992 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
996 def __create_bgp_unicast_address_family(
997 topo
, input_dict
, router
, addr_type
, add_neigh
=True
1000 API prints bgp global config to bgp_json file.
1004 * `bgp_cfg` : BGP class variables have BGP config saved in it for
1006 * `local_as_no` : Local as number
1007 * `router_id` : Router-id
1008 * `ecmp_path` : ECMP max path
1009 * `gr_enable` : BGP global gracefull restart config
1013 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1014 tgen
= get_topogen()
1015 bgp_data
= input_dict
["address_family"]
1016 neigh_data
= bgp_data
[addr_type
]["unicast"]["neighbor"]
1018 for peer_name
, peer_dict
in deepcopy(neigh_data
).items():
1019 for dest_link
, peer
in peer_dict
["dest_link"].items():
1022 nh_details
= topo
[peer_name
]
1023 activate_addr_family
= peer
.setdefault("activate", None)
1024 deactivate_addr_family
= peer
.setdefault("deactivate", None)
1025 # Loopback interface
1026 if "source_link" in peer
and peer
["source_link"] == "lo":
1027 for destRouterLink
, data
in sorted(nh_details
["links"].items()):
1028 if "type" in data
and data
["type"] == "loopback":
1029 if dest_link
== destRouterLink
:
1031 nh_details
["links"][destRouterLink
][addr_type
]
1036 # Physical interface
1038 # check the neighbor type if un numbered nbr, use interface.
1039 if "neighbor_type" in peer
and peer
["neighbor_type"] == "unnumbered":
1040 ip_addr
= nh_details
["links"][dest_link
]["peer-interface"]
1041 elif "neighbor_type" in peer
and peer
["neighbor_type"] == "link-local":
1042 intf
= topo
[peer_name
]["links"][dest_link
]["interface"]
1043 ip_addr
= get_frr_ipv6_linklocal(tgen
, peer_name
, intf
)
1044 elif dest_link
in nh_details
["links"].keys():
1046 ip_addr
= nh_details
["links"][dest_link
][addr_type
].split("/")[
1050 intf
= topo
[peer_name
]["links"][dest_link
]["interface"]
1051 ip_addr
= get_frr_ipv6_linklocal(tgen
, peer_name
, intf
)
1054 and bgp_data
["ipv6"]
1055 and check_address_types("ipv6")
1056 and "ipv6" in nh_details
["links"][dest_link
]
1058 deactivate
= nh_details
["links"][dest_link
]["ipv6"].split("/")[
1062 neigh_cxt
= "neighbor {}".format(ip_addr
)
1063 config_data
.append("address-family {} unicast".format(addr_type
))
1065 if activate_addr_family
is not None:
1067 "address-family {} unicast".format(activate_addr_family
)
1070 config_data
.append("{} activate".format(neigh_cxt
))
1072 if deactivate
and activate_addr_family
is None:
1073 config_data
.append("no neighbor {} activate".format(deactivate
))
1075 if deactivate_addr_family
is not None:
1077 "address-family {} unicast".format(deactivate_addr_family
)
1079 config_data
.append("no {} activate".format(neigh_cxt
))
1081 next_hop_self
= peer
.setdefault("next_hop_self", None)
1082 send_community
= peer
.setdefault("send_community", None)
1083 prefix_lists
= peer
.setdefault("prefix_lists", {})
1084 route_maps
= peer
.setdefault("route_maps", {})
1085 no_send_community
= peer
.setdefault("no_send_community", None)
1086 capability
= peer
.setdefault("capability", None)
1087 allowas_in
= peer
.setdefault("allowas-in", None)
1090 if next_hop_self
is not None:
1091 if next_hop_self
is True:
1092 config_data
.append("{} next-hop-self".format(neigh_cxt
))
1094 config_data
.append("no {} next-hop-self".format(neigh_cxt
))
1098 config_data
.append("{} send-community".format(neigh_cxt
))
1101 if no_send_community
:
1103 "no {} send-community {}".format(neigh_cxt
, no_send_community
)
1107 if capability
and addr_type
== "ipv6":
1108 config_data
.append("address-family ipv4 unicast")
1109 config_data
.append("{} activate".format(neigh_cxt
))
1111 if "allowas_in" in peer
:
1112 allow_as_in
= peer
["allowas_in"]
1113 config_data
.append("{} allowas-in {}".format(neigh_cxt
, allow_as_in
))
1115 if "no_allowas_in" in peer
:
1116 allow_as_in
= peer
["no_allowas_in"]
1117 config_data
.append("no {} allowas-in {}".format(neigh_cxt
, allow_as_in
))
1119 if "shutdown" in peer
:
1121 "{} {} shutdown".format(
1122 "no" if not peer
["shutdown"] else "", neigh_cxt
1127 for prefix_list
in prefix_lists
:
1128 name
= prefix_list
.setdefault("name", {})
1129 direction
= prefix_list
.setdefault("direction", "in")
1130 del_action
= prefix_list
.setdefault("delete", False)
1133 "Router %s: 'name' not present in "
1134 "input_dict for BGP neighbor prefix lists",
1138 cmd
= "{} prefix-list {} {}".format(neigh_cxt
, name
, direction
)
1140 cmd
= "no {}".format(cmd
)
1141 config_data
.append(cmd
)
1144 for route_map
in route_maps
:
1145 name
= route_map
.setdefault("name", {})
1146 direction
= route_map
.setdefault("direction", "in")
1147 del_action
= route_map
.setdefault("delete", False)
1150 "Router %s: 'name' not present in "
1151 "input_dict for BGP neighbor route name",
1155 cmd
= "{} route-map {} {}".format(neigh_cxt
, name
, direction
)
1157 cmd
= "no {}".format(cmd
)
1158 config_data
.append(cmd
)
1161 number_occurences
= allowas_in
.setdefault("number_occurences", {})
1162 del_action
= allowas_in
.setdefault("delete", False)
1164 cmd
= "{} allowas-in {}".format(neigh_cxt
, number_occurences
)
1167 cmd
= "no {}".format(cmd
)
1169 config_data
.append(cmd
)
1174 def modify_bgp_config_when_bgpd_down(tgen
, topo
, input_dict
):
1176 API will save the current config to router's /etc/frr/ for BGPd
1177 daemon(bgpd.conf file)
1181 * `tgen` : Topogen object
1182 * `topo` : json file data
1183 * `input_dict` : defines for which router, and which config
1184 needs to be modified
1188 # Modify graceful-restart config not to set f-bit
1189 # and write to /etc/frr
1191 # Api call to delete advertised networks
1198 "advertise_networks": [
1200 "network": "101.0.20.1/32",
1209 "advertise_networks": [
1211 "network": "5::1/128",
1223 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
1227 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1229 result
= create_router_bgp(
1230 tgen
, topo
, input_dict
, build
=False, load_config
=False
1232 if result
is not True:
1235 # Copy bgp config file to /etc/frr
1236 for dut
in input_dict
.keys():
1237 router_list
= tgen
.routers()
1238 for router
, rnode
in router_list
.items():
1242 logger
.info("Delete BGP config when BGPd is down in {}".format(router
))
1243 # Reading the config from "rundir" and copy to /etc/frr/bgpd.conf
1244 cmd
= "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
1245 tgen
.logdir
, router
, FRRCFG_FILE
1247 router_list
[router
].run(cmd
)
1249 except Exception as e
:
1250 errormsg
= traceback
.format_exc()
1251 logger
.error(errormsg
)
1254 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1258 #############################################
1260 #############################################
1261 @retry(retry_timeout
=8)
1262 def verify_router_id(tgen
, topo
, input_dict
, expected
=True):
1264 Running command "show ip bgp json" for DUT and reading router-id
1265 from input_dict and verifying with command output.
1266 1. Statically modfified router-id should take place
1267 2. When static router-id is deleted highest loopback should
1269 3. When loopback intf is down then highest physcial intf
1270 should become router-id
1274 * `tgen`: topogen object
1275 * `topo`: input json file data
1276 * `input_dict`: input dictionary, have details of Device Under Test, for
1277 which user wants to test the data
1278 * `expected` : expected results from API, by-default True
1282 # Verify if router-id for r1 is 12.12.12.12
1285 "router_id": "12.12.12.12"
1287 # Verify that router-id for r1 is highest interface ip
1291 result = verify_router_id(tgen, topo, input_dict)
1295 errormsg(str) or True
1298 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1299 for router
in input_dict
.keys():
1300 if router
not in tgen
.routers():
1303 rnode
= tgen
.routers()[router
]
1305 del_router_id
= input_dict
[router
]["bgp"].setdefault("del_router_id", False)
1307 logger
.info("Checking router %s router-id", router
)
1308 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
1309 router_id_out
= show_bgp_json
["ipv4Unicast"]["routerId"]
1310 router_id_out
= ipaddress
.IPv4Address(frr_unicode(router_id_out
))
1312 # Once router-id is deleted, highest interface ip should become
1315 router_id
= find_interface_with_greater_ip(topo
, router
)
1317 router_id
= input_dict
[router
]["bgp"]["router_id"]
1318 router_id
= ipaddress
.IPv4Address(frr_unicode(router_id
))
1320 if router_id
== router_id_out
:
1321 logger
.info("Found expected router-id %s for router %s", router_id
, router
)
1324 "Router-id for router:{} mismatch, expected:"
1325 " {} but found:{}".format(router
, router_id
, router_id_out
)
1329 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1333 @retry(retry_timeout
=150)
1334 def verify_bgp_convergence(tgen
, topo
=None, dut
=None, expected
=True):
1336 API will verify if BGP is converged with in the given time frame.
1337 Running "show bgp summary json" command and verify bgp neighbor
1338 state is established,
1342 * `tgen`: topogen object
1343 * `topo`: input json file data
1344 * `dut`: device under test
1348 # To veriry is BGP is converged for all the routers used in
1350 results = verify_bgp_convergence(tgen, topo, dut="r1")
1354 errormsg(str) or True
1358 topo
= tgen
.json_topo
1361 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1362 tgen
= get_topogen()
1363 for router
, rnode
in tgen
.routers().items():
1364 if "bgp" not in topo
["routers"][router
]:
1367 if dut
is not None and dut
!= router
:
1370 logger
.info("Verifying BGP Convergence on router %s:", router
)
1371 show_bgp_json
= run_frr_cmd(rnode
, "show bgp vrf all summary json", isjson
=True)
1372 # Verifying output dictionary show_bgp_json is empty or not
1373 if not bool(show_bgp_json
):
1374 errormsg
= "BGP is not running"
1377 # To find neighbor ip type
1378 bgp_data_list
= topo
["routers"][router
]["bgp"]
1380 if type(bgp_data_list
) is not list:
1381 bgp_data_list
= [bgp_data_list
]
1383 for bgp_data
in bgp_data_list
:
1384 if "vrf" in bgp_data
:
1385 vrf
= bgp_data
["vrf"]
1391 # To find neighbor ip type
1392 bgp_addr_type
= bgp_data
["address_family"]
1393 if "l2vpn" in bgp_addr_type
:
1396 if "neighbor" not in bgp_addr_type
["l2vpn"]["evpn"]:
1399 bgp_neighbors
= bgp_addr_type
["l2vpn"]["evpn"]["neighbor"]
1400 total_evpn_peer
+= len(bgp_neighbors
)
1403 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1404 for _addr_type
, dest_link_dict
in peer_data
.items():
1405 data
= topo
["routers"][bgp_neighbor
]["links"]
1406 for dest_link
in dest_link_dict
.keys():
1407 if dest_link
in data
:
1408 peer_details
= peer_data
[_addr_type
][dest_link
]
1410 neighbor_ip
= data
[dest_link
][_addr_type
].split("/")[0]
1414 "ipv4Unicast" in show_bgp_json
[vrf
]
1415 or "ipv6Unicast" in show_bgp_json
[vrf
]
1418 "[DUT: %s] VRF: %s, "
1419 "ipv4Unicast/ipv6Unicast"
1420 " address-family present"
1421 " under l2vpn" % (router
, vrf
)
1425 l2VpnEvpn_data
= show_bgp_json
[vrf
]["l2VpnEvpn"][
1428 nh_state
= l2VpnEvpn_data
[neighbor_ip
]["state"]
1430 if nh_state
== "Established":
1431 no_of_evpn_peer
+= 1
1433 if no_of_evpn_peer
== total_evpn_peer
:
1435 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
1442 "[DUT: %s] VRF: %s, BGP is not converged "
1443 "for evpn peers" % (router
, vrf
)
1448 for addr_type
in bgp_addr_type
.keys():
1449 if not check_address_types(addr_type
):
1452 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1454 for bgp_neighbor
in bgp_neighbors
:
1455 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
1458 for addr_type
in bgp_addr_type
.keys():
1459 if not check_address_types(addr_type
):
1461 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1463 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1464 for dest_link
in peer_data
["dest_link"].keys():
1465 data
= topo
["routers"][bgp_neighbor
]["links"]
1466 if dest_link
in data
:
1467 peer_details
= peer_data
["dest_link"][dest_link
]
1468 # for link local neighbors
1470 "neighbor_type" in peer_details
1471 and peer_details
["neighbor_type"] == "link-local"
1473 intf
= topo
["routers"][bgp_neighbor
]["links"][
1476 neighbor_ip
= get_frr_ipv6_linklocal(
1477 tgen
, bgp_neighbor
, intf
1479 elif "source_link" in peer_details
:
1480 neighbor_ip
= topo
["routers"][bgp_neighbor
][
1482 ][peer_details
["source_link"]][addr_type
].split(
1488 "neighbor_type" in peer_details
1489 and peer_details
["neighbor_type"] == "unnumbered"
1491 neighbor_ip
= data
[dest_link
]["peer-interface"]
1493 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[
1497 neighbor_ip
= neighbor_ip
.lower()
1498 if addr_type
== "ipv4":
1499 ipv4_data
= show_bgp_json
[vrf
]["ipv4Unicast"][
1502 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1504 ipv6_data
= show_bgp_json
[vrf
]["ipv6Unicast"][
1507 if neighbor_ip
in ipv6_data
:
1508 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1510 if nh_state
== "Established":
1513 if no_of_peer
== total_peer
and no_of_peer
> 0:
1514 logger
.info("[DUT: %s] VRF: %s, BGP is Converged", router
, vrf
)
1517 errormsg
= "[DUT: %s] VRF: %s, BGP is not converged" % (router
, vrf
)
1520 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1524 @retry(retry_timeout
=16)
1525 def verify_bgp_community(
1536 API to veiryf BGP large community is attached in route for any given
1537 DUT by running "show bgp ipv4/6 {route address} json" command.
1541 * `tgen`: topogen object
1542 * `addr_type` : ip type, ipv4/ipv6
1543 * `dut`: Device Under Test
1544 * `network`: network for which set criteria needs to be verified
1545 * `input_dict`: having details like - for which router, community and
1546 values needs to be verified
1548 * `bestpath`: To check best path cli
1549 * `expected` : expected results from API, by-default True
1553 networks = ["200.50.2.0/32"]
1555 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1557 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1561 errormsg(str) or True
1564 logger
.debug("Entering lib API: verify_bgp_community()")
1565 if router
not in tgen
.routers():
1568 rnode
= tgen
.routers()[router
]
1571 "Verifying BGP community attributes on dut %s: for %s " "network %s",
1577 command
= "show bgp"
1581 cmd
= "{} vrf {} {} {} json".format(command
, vrf
, addr_type
, net
)
1583 cmd
= "{} {} {} bestpath json".format(command
, addr_type
, net
)
1585 cmd
= "{} {} {} json".format(command
, addr_type
, net
)
1587 show_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
1588 if "paths" not in show_bgp_json
:
1589 return "Prefix {} not found in BGP table of router: {}".format(net
, router
)
1591 as_paths
= show_bgp_json
["paths"]
1593 for i
in range(len(as_paths
)):
1595 "largeCommunity" in show_bgp_json
["paths"][i
]
1596 or "community" in show_bgp_json
["paths"][i
]
1600 "Large Community attribute is found for route:" " %s in router: %s",
1604 if input_dict
is not None:
1605 for criteria
, comm_val
in input_dict
.items():
1606 show_val
= show_bgp_json
["paths"][i
][criteria
]["string"]
1607 if comm_val
== show_val
:
1609 "Verifying BGP %s for prefix: %s"
1610 " in router: %s, found expected"
1619 "Failed: Verifying BGP attribute"
1620 " {} for route: {} in router: {}"
1621 ", expected value: {} but found"
1622 ": {}".format(criteria
, net
, router
, comm_val
, show_val
)
1628 "Large Community attribute is not found for route: "
1629 "{} in router: {} ".format(net
, router
)
1633 logger
.debug("Exiting lib API: verify_bgp_community()")
1637 def modify_as_number(tgen
, topo
, input_dict
):
1639 API reads local_as and remote_as from user defined input_dict and
1640 modify router"s ASNs accordingly. Router"s config is modified and
1641 recent/changed config is loadeded to router.
1645 * `tgen` : Topogen object
1646 * `topo` : json file data
1647 * `input_dict` : defines for which router ASNs needs to be modified
1651 To modify ASNs for router r1
1658 result = modify_as_number(tgen, topo, input_dict)
1662 errormsg(str) or True
1665 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1668 new_topo
= deepcopy(topo
["routers"])
1670 for router
in input_dict
.keys():
1671 # Remove bgp configuration
1673 router_dict
.update({router
: {"bgp": {"delete": True}}})
1675 new_topo
[router
]["bgp"]["local_as"] = input_dict
[router
]["bgp"][
1679 new_topo
[router
]["bgp"][0]["local_as"] = input_dict
[router
]["bgp"][
1682 logger
.info("Removing bgp configuration")
1683 create_router_bgp(tgen
, topo
, router_dict
)
1685 logger
.info("Applying modified bgp configuration")
1686 result
= create_router_bgp(tgen
, new_topo
)
1687 if result
is not True:
1688 result
= "Error applying new AS number config"
1689 except Exception as e
:
1690 errormsg
= traceback
.format_exc()
1691 logger
.error(errormsg
)
1694 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1698 @retry(retry_timeout
=8)
1699 def verify_as_numbers(tgen
, topo
, input_dict
, expected
=True):
1701 This API is to verify AS numbers for given DUT by running
1702 "show ip bgp neighbor json" command. Local AS and Remote AS
1703 will ve verified with input_dict data and command output.
1707 * `tgen`: topogen object
1708 * `topo`: input json file data
1709 * `addr_type` : ip type, ipv4/ipv6
1710 * `input_dict`: defines - for which router, AS numbers needs to be verified
1711 * `expected` : expected results from API, by-default True
1722 result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1726 errormsg(str) or True
1729 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1730 for router
in input_dict
.keys():
1731 if router
not in tgen
.routers():
1734 rnode
= tgen
.routers()[router
]
1736 logger
.info("Verifying AS numbers for dut %s:", router
)
1738 show_ip_bgp_neighbor_json
= run_frr_cmd(
1739 rnode
, "show ip bgp neighbor json", isjson
=True
1741 local_as
= input_dict
[router
]["bgp"]["local_as"]
1742 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1744 for addr_type
in bgp_addr_type
:
1745 if not check_address_types(addr_type
):
1748 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1750 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1751 remote_as
= input_dict
[bgp_neighbor
]["bgp"]["local_as"]
1752 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
1754 data
= topo
["routers"][bgp_neighbor
]["links"]
1756 if dest_link
in data
:
1757 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1758 neigh_data
= show_ip_bgp_neighbor_json
[neighbor_ip
]
1759 # Verify Local AS for router
1760 if neigh_data
["localAs"] != local_as
:
1762 "Failed: Verify local_as for dut {},"
1763 " found: {} but expected: {}".format(
1764 router
, neigh_data
["localAs"], local_as
1770 "Verified local_as for dut %s, found" " expected: %s",
1775 # Verify Remote AS for neighbor
1776 if neigh_data
["remoteAs"] != remote_as
:
1778 "Failed: Verify remote_as for dut "
1779 "{}'s neighbor {}, found: {} but "
1780 "expected: {}".format(
1781 router
, bgp_neighbor
, neigh_data
["remoteAs"], remote_as
1787 "Verified remote_as for dut %s's "
1788 "neighbor %s, found expected: %s",
1794 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1798 @retry(retry_timeout
=150)
1799 def verify_bgp_convergence_from_running_config(tgen
, dut
=None, expected
=True):
1801 API to verify BGP convergence b/w loopback and physical interface.
1802 This API would be used when routers have BGP neighborship is loopback
1803 to physical or vice-versa
1807 * `tgen`: topogen object
1808 * `dut`: device under test
1809 * `expected` : expected results from API, by-default True
1813 results = verify_bgp_convergence_bw_lo_and_phy_intf(tgen, topo,
1818 errormsg(str) or True
1821 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1823 for router
, rnode
in tgen
.routers().items():
1824 if dut
is not None and dut
!= router
:
1827 logger
.info("Verifying BGP Convergence on router %s:", router
)
1828 show_bgp_json
= run_frr_cmd(rnode
, "show bgp vrf all summary json", isjson
=True)
1829 # Verifying output dictionary show_bgp_json is empty or not
1830 if not bool(show_bgp_json
):
1831 errormsg
= "BGP is not running"
1834 for vrf
, addr_family_data
in show_bgp_json
.items():
1835 for address_family
, neighborship_data
in addr_family_data
.items():
1839 total_peer
= len(neighborship_data
["peers"].keys())
1841 for peer
, peer_data
in neighborship_data
["peers"].items():
1842 if peer_data
["state"] == "Established":
1845 if total_peer
!= no_of_peer
:
1847 "[DUT: %s] VRF: %s, BGP is not converged"
1848 " for peer: %s" % (router
, vrf
, peer
)
1852 logger
.info("[DUT: %s]: vrf: %s, BGP is Converged", router
, vrf
)
1854 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1859 def clear_bgp(tgen
, addr_type
, router
, vrf
=None, neighbor
=None):
1861 This API is to clear bgp neighborship by running
1862 clear ip bgp */clear bgp ipv6 * command,
1866 * `tgen`: topogen object
1867 * `addr_type`: ip type ipv4/ipv6
1868 * `router`: device under test
1870 * `neighbor`: Neighbor for which bgp needs to be cleared
1874 clear_bgp(tgen, addr_type, "r1")
1877 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1879 if router
not in tgen
.routers():
1882 rnode
= tgen
.routers()[router
]
1885 if type(vrf
) is not list:
1889 logger
.info("Clearing BGP neighborship for router %s..", router
)
1890 if addr_type
== "ipv4":
1893 run_frr_cmd(rnode
, "clear ip bgp vrf {} *".format(_vrf
))
1895 run_frr_cmd(rnode
, "clear bgp ipv4 {}".format(neighbor
))
1897 run_frr_cmd(rnode
, "clear ip bgp *")
1898 elif addr_type
== "ipv6":
1901 run_frr_cmd(rnode
, "clear bgp vrf {} ipv6 *".format(_vrf
))
1903 run_frr_cmd(rnode
, "clear bgp ipv6 {}".format(neighbor
))
1905 run_frr_cmd(rnode
, "clear bgp ipv6 *")
1907 run_frr_cmd(rnode
, "clear bgp *")
1909 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1912 def clear_bgp_and_verify(tgen
, topo
, router
, rid
=None):
1914 This API is to clear bgp neighborship and verify bgp neighborship
1915 is coming up(BGP is converged) usinf "show bgp summary json" command
1916 and also verifying for all bgp neighbors uptime before and after
1917 clear bgp sessions is different as the uptime must be changed once
1918 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1922 * `tgen`: topogen object
1923 * `topo`: input json file data
1924 * `router`: device under test
1928 result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1932 errormsg(str) or True
1935 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1937 if router
not in tgen
.routers():
1940 rnode
= tgen
.routers()[router
]
1942 peer_uptime_before_clear_bgp
= {}
1945 # Verifying BGP convergence before bgp clear command
1946 for retry
in range(50):
1947 # Waiting for BGP to converge
1949 "Waiting for %s sec for BGP to converge on router" " %s...",
1955 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
1956 # Verifying output dictionary show_bgp_json is empty or not
1957 if not bool(show_bgp_json
):
1958 errormsg
= "BGP is not running"
1961 # To find neighbor ip type
1963 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1965 bgp_addr_type
= topo
["routers"][router
]["bgp"][0]["address_family"]
1968 for addr_type
in bgp_addr_type
.keys():
1970 if not check_address_types(addr_type
):
1973 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1975 for bgp_neighbor
in bgp_neighbors
:
1976 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
1979 for addr_type
in bgp_addr_type
:
1980 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1982 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
1983 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
1984 data
= topo
["routers"][bgp_neighbor
]["links"]
1986 if dest_link
in data
:
1987 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1988 if addr_type
== "ipv4":
1989 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
1990 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1992 # Peer up time dictionary
1993 peer_uptime_before_clear_bgp
[bgp_neighbor
] = ipv4_data
[
1995 ]["peerUptimeEstablishedEpoch"]
1997 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
1998 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2000 # Peer up time dictionary
2001 peer_uptime_before_clear_bgp
[bgp_neighbor
] = ipv6_data
[
2003 ]["peerUptimeEstablishedEpoch"]
2005 if nh_state
== "Established":
2008 if no_of_peer
== total_peer
:
2009 logger
.info("BGP is Converged for router %s before bgp" " clear", router
)
2013 "BGP is not yet Converged for router %s " "before bgp clear", router
2017 "TIMEOUT!! BGP is not converged in {} seconds for"
2018 " router {}".format(retry
* sleeptime
, router
)
2023 logger
.info("Clearing BGP neighborship for router %s..", router
)
2024 for addr_type
in bgp_addr_type
.keys():
2025 if addr_type
== "ipv4":
2027 run_frr_cmd(rnode
, "clear bgp ipv4 {}".format(rid
))
2029 run_frr_cmd(rnode
, "clear bgp ipv4 *")
2030 elif addr_type
== "ipv6":
2032 run_frr_cmd(rnode
, "clear bgp ipv6 {}".format(rid
))
2034 run_frr_cmd(rnode
, "clear bgp ipv6 *")
2035 peer_uptime_after_clear_bgp
= {}
2036 # Verifying BGP convergence after bgp clear command
2037 for retry
in range(50):
2039 # Waiting for BGP to converge
2041 "Waiting for %s sec for BGP to converge on router" " %s...",
2047 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
2048 # Verifying output dictionary show_bgp_json is empty or not
2049 if not bool(show_bgp_json
):
2050 errormsg
= "BGP is not running"
2053 # To find neighbor ip type
2055 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
2057 bgp_addr_type
= topo
["routers"][router
]["bgp"][0]["address_family"]
2060 for addr_type
in bgp_addr_type
.keys():
2061 if not check_address_types(addr_type
):
2064 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2066 for bgp_neighbor
in bgp_neighbors
:
2067 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
2070 for addr_type
in bgp_addr_type
:
2071 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2073 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2074 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2075 data
= topo
["routers"][bgp_neighbor
]["links"]
2077 if dest_link
in data
:
2078 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2079 if addr_type
== "ipv4":
2080 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2081 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2082 peer_uptime_after_clear_bgp
[bgp_neighbor
] = ipv4_data
[
2084 ]["peerUptimeEstablishedEpoch"]
2086 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2087 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2088 # Peer up time dictionary
2089 peer_uptime_after_clear_bgp
[bgp_neighbor
] = ipv6_data
[
2091 ]["peerUptimeEstablishedEpoch"]
2093 if nh_state
== "Established":
2096 if no_of_peer
== total_peer
:
2097 logger
.info("BGP is Converged for router %s after bgp clear", router
)
2101 "BGP is not yet Converged for router %s after" " bgp clear", router
2105 "TIMEOUT!! BGP is not converged in {} seconds for"
2106 " router {}".format(retry
* sleeptime
, router
)
2110 # Comparing peerUptimeEstablishedEpoch dictionaries
2111 if peer_uptime_before_clear_bgp
!= peer_uptime_after_clear_bgp
:
2112 logger
.info("BGP neighborship is reset after clear BGP on router %s", router
)
2115 "BGP neighborship is not reset after clear bgp on router"
2116 " {}".format(router
)
2120 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2124 def verify_bgp_timers_and_functionality(tgen
, topo
, input_dict
):
2126 To verify BGP timer config, execute "show ip bgp neighbor json" command
2127 and verify bgp timers with input_dict data.
2128 To veirfy bgp timers functonality, shutting down peer interface
2129 and verify BGP neighborship status.
2133 * `tgen`: topogen object
2134 * `topo`: input json file data
2135 * `addr_type`: ip type, ipv4/ipv6
2136 * `input_dict`: defines for which router, bgp timers needs to be verified
2139 # To verify BGP timers for neighbor r2 of router r1
2145 "keepalivetimer": 5,
2146 "holddowntimer": 15,
2148 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
2153 errormsg(str) or True
2156 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2158 router_list
= tgen
.routers()
2159 for router
in input_dict
.keys():
2160 if router
not in router_list
:
2163 rnode
= router_list
[router
]
2165 logger
.info("Verifying bgp timers functionality, DUT is %s:", router
)
2167 show_ip_bgp_neighbor_json
= run_frr_cmd(
2168 rnode
, "show ip bgp neighbor json", isjson
=True
2171 bgp_addr_type
= input_dict
[router
]["bgp"]["address_family"]
2173 for addr_type
in bgp_addr_type
:
2174 if not check_address_types(addr_type
):
2177 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2178 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2179 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2180 data
= topo
["routers"][bgp_neighbor
]["links"]
2182 keepalivetimer
= peer_dict
["keepalivetimer"]
2183 holddowntimer
= peer_dict
["holddowntimer"]
2185 if dest_link
in data
:
2186 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2187 neighbor_intf
= data
[dest_link
]["interface"]
2189 # Verify HoldDownTimer for neighbor
2190 bgpHoldTimeMsecs
= show_ip_bgp_neighbor_json
[neighbor_ip
][
2191 "bgpTimerHoldTimeMsecs"
2193 if bgpHoldTimeMsecs
!= holddowntimer
* 1000:
2195 "Verifying holddowntimer for bgp "
2196 "neighbor {} under dut {}, found: {} "
2197 "but expected: {}".format(
2201 holddowntimer
* 1000,
2206 # Verify KeepAliveTimer for neighbor
2207 bgpKeepAliveTimeMsecs
= show_ip_bgp_neighbor_json
[neighbor_ip
][
2208 "bgpTimerKeepAliveIntervalMsecs"
2210 if bgpKeepAliveTimeMsecs
!= keepalivetimer
* 1000:
2212 "Verifying keepalivetimer for bgp "
2213 "neighbor {} under dut {}, found: {} "
2214 "but expected: {}".format(
2217 bgpKeepAliveTimeMsecs
,
2218 keepalivetimer
* 1000,
2223 ####################
2224 # Shutting down peer interface after keepalive time and
2225 # after some time bringing up peer interface.
2226 # verifying BGP neighborship in (hold down-keep alive)
2227 # time, it should not go down
2228 ####################
2230 # Wait till keep alive time
2231 logger
.info("=" * 20)
2232 logger
.info("Scenario 1:")
2234 "Shutdown and bring up peer interface: %s "
2235 "in keep alive time : %s sec and verify "
2236 " BGP neighborship is intact in %s sec ",
2239 (holddowntimer
- keepalivetimer
),
2241 logger
.info("=" * 20)
2242 logger
.info("Waiting for %s sec..", keepalivetimer
)
2243 sleep(keepalivetimer
)
2245 # Shutting down peer ineterface
2247 "Shutting down interface %s on router %s",
2251 topotest
.interface_set_status(
2252 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=False
2255 # Bringing up peer interface
2258 "Bringing up interface %s on router %s..",
2262 topotest
.interface_set_status(
2263 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=True
2266 # Verifying BGP neighborship is intact in
2267 # (holddown - keepalive) time
2269 keepalivetimer
, holddowntimer
, int(holddowntimer
/ 3)
2271 logger
.info("Waiting for %s sec..", keepalivetimer
)
2272 sleep(keepalivetimer
)
2274 show_bgp_json
= run_frr_cmd(
2275 rnode
, "show bgp summary json", isjson
=True
2278 if addr_type
== "ipv4":
2279 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2280 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2282 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2283 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2285 if timer
== (holddowntimer
- keepalivetimer
):
2286 if nh_state
!= "Established":
2288 "BGP neighborship has not gone "
2289 "down in {} sec for neighbor {}".format(
2296 "BGP neighborship is intact in %s"
2297 " sec for neighbor %s",
2302 ####################
2303 # Shutting down peer interface and verifying that BGP
2304 # neighborship is going down in holddown time
2305 ####################
2306 logger
.info("=" * 20)
2307 logger
.info("Scenario 2:")
2309 "Shutdown peer interface: %s and verify BGP"
2310 " neighborship has gone down in hold down "
2315 logger
.info("=" * 20)
2318 "Shutting down interface %s on router %s..",
2322 topotest
.interface_set_status(
2323 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=False
2326 # Verifying BGP neighborship is going down in holddown time
2329 (holddowntimer
+ keepalivetimer
),
2330 int(holddowntimer
/ 3),
2332 logger
.info("Waiting for %s sec..", keepalivetimer
)
2333 sleep(keepalivetimer
)
2335 show_bgp_json
= run_frr_cmd(
2336 rnode
, "show bgp summary json", isjson
=True
2339 if addr_type
== "ipv4":
2340 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
2341 nh_state
= ipv4_data
[neighbor_ip
]["state"]
2343 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
2344 nh_state
= ipv6_data
[neighbor_ip
]["state"]
2346 if timer
== holddowntimer
:
2347 if nh_state
== "Established":
2349 "BGP neighborship has not gone "
2350 "down in {} sec for neighbor {}".format(
2357 "BGP neighborship has gone down in"
2358 " %s sec for neighbor %s",
2363 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2367 @retry(retry_timeout
=16)
2368 def verify_bgp_attributes(
2381 API will verify BGP attributes set by Route-map for given prefix and
2382 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
2383 in DUT to verify BGP attributes set by route-map, Set attributes
2384 values will be read from input_dict and verified with command output.
2386 * `tgen`: topogen object
2387 * `addr_type` : ip type, ipv4/ipv6
2388 * `dut`: Device Under Test
2389 * `static_routes`: Static Routes for which BGP set attributes needs to be
2391 * `rmap_name`: route map name for which set criteria needs to be verified
2392 * `input_dict`: defines for which router, AS numbers needs
2393 * `seq_id`: sequence number of rmap, default is None
2394 * `expected` : expected results from API, by-default True
2398 # To verify BGP attribute "localpref" set to 150 and "med" set to 30
2399 for prefix 10.0.20.1/32 in router r3.
2403 "rmap_match_pf_list1": [
2406 "match": {"prefix_list": "pf_list_1"},
2407 "set": {"localpref": 150, "med": 30}
2411 "as_path": "500 400"
2414 static_routes (list) = ["10.0.20.1/32"]
2420 errormsg(str) or True
2423 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2424 for router
, rnode
in tgen
.routers().items():
2428 logger
.info("Verifying BGP set attributes for dut {}:".format(router
))
2430 for static_route
in static_routes
:
2432 cmd
= "show bgp vrf {} {} {} json".format(vrf
, addr_type
, static_route
)
2434 cmd
= "show bgp {} {} json".format(addr_type
, static_route
)
2435 show_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2440 dict_list
= list(input_dict
.values())[0]
2442 if "route_maps" in dict_list
:
2443 for rmap_router
in input_dict
.keys():
2444 for rmap
, values
in input_dict
[rmap_router
]["route_maps"].items():
2445 if rmap
== rmap_name
:
2446 dict_to_test
= values
2447 for rmap_dict
in values
:
2448 if seq_id
is not None:
2449 if type(seq_id
) is not list:
2452 if "seq_id" in rmap_dict
:
2453 rmap_seq_id
= rmap_dict
["seq_id"]
2454 for _seq_id
in seq_id
:
2455 if _seq_id
== rmap_seq_id
:
2456 tmp_list
.append(rmap_dict
)
2458 dict_to_test
= tmp_list
2461 for rmap_dict
in dict_to_test
:
2462 if "set" in rmap_dict
:
2463 for criteria
in rmap_dict
["set"].keys():
2465 for path
in show_bgp_json
["paths"]:
2466 if criteria
not in path
:
2469 if criteria
== "aspath":
2470 value
= path
[criteria
]["string"]
2472 value
= path
[criteria
]
2474 if rmap_dict
["set"][criteria
] == value
:
2493 "Failed: Verifying BGP "
2494 "attribute {} for route:"
2495 " {} in router: {}, "
2496 " expected value: {} but"
2497 " found: {}".format(
2501 rmap_dict
["set"][criteria
],
2507 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2511 @retry(retry_timeout
=8)
2512 def verify_best_path_as_per_bgp_attribute(
2513 tgen
, addr_type
, router
, input_dict
, attribute
, expected
=True
2516 API is to verify best path according to BGP attributes for given routes.
2517 "show bgp ipv4/6 json" command will be run and verify best path according
2518 to shortest as-path, highest local-preference and med, lowest weight and
2519 route origin IGP>EGP>INCOMPLETE.
2522 * `tgen` : topogen object
2523 * `addr_type` : ip type, ipv4/ipv6
2524 * `tgen` : topogen object
2525 * `attribute` : calculate best path using this attribute
2526 * `input_dict`: defines different routes to calculate for which route
2527 best path is selected
2528 * `expected` : expected results from API, by-default True
2532 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
2533 router r7 to router r1(DUT) as per shortest as-path attribute
2540 "advertise_networks": [
2542 "network": "200.50.2.0/32"
2545 "network": "200.60.2.0/32"
2554 attribute = "locPrf"
2555 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
2556 input_dict, attribute)
2559 errormsg(str) or True
2562 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2564 if router
not in tgen
.routers():
2567 rnode
= tgen
.routers()[router
]
2569 # Verifying show bgp json
2570 command
= "show bgp"
2573 logger
.info("Verifying router %s RIB for best path:", router
)
2575 static_route
= False
2576 advertise_network
= False
2577 for route_val
in input_dict
.values():
2578 if "static_routes" in route_val
:
2580 networks
= route_val
["static_routes"]
2582 advertise_network
= True
2583 net_data
= route_val
["bgp"]["address_family"][addr_type
]["unicast"]
2584 networks
= net_data
["advertise_networks"]
2586 for network
in networks
:
2587 _network
= network
["network"]
2588 no_of_ip
= network
.setdefault("no_of_ip", 1)
2589 vrf
= network
.setdefault("vrf", None)
2592 cmd
= "{} vrf {}".format(command
, vrf
)
2596 cmd
= "{} {}".format(cmd
, addr_type
)
2597 cmd
= "{} json".format(cmd
)
2598 sh_ip_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2600 routes
= generate_ips(_network
, no_of_ip
)
2601 for route
in routes
:
2602 route
= str(ipaddress
.ip_network(frr_unicode(route
)))
2604 if route
in sh_ip_bgp_json
["routes"]:
2605 route_attributes
= sh_ip_bgp_json
["routes"][route
]
2609 for route_attribute
in route_attributes
:
2610 next_hops
= route_attribute
["nexthops"]
2611 for next_hop
in next_hops
:
2612 next_hop_ip
= next_hop
["ip"]
2613 attribute_dict
[next_hop_ip
] = route_attribute
[attribute
]
2616 if attribute
== "path":
2617 # Find next_hop for the route have minimum as_path
2619 attribute_dict
, key
=lambda x
: len(set(attribute_dict
[x
]))
2621 compare
= "SHORTEST"
2623 # LOCAL_PREF attribute
2624 elif attribute
== "locPrf":
2625 # Find next_hop for the route have highest local preference
2627 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2632 elif attribute
== "weight":
2633 # Find next_hop for the route have highest weight
2635 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2640 elif attribute
== "origin":
2641 # Find next_hop for the route have IGP as origin, -
2642 # - rule is IGP>EGP>INCOMPLETE
2645 for (key
, value
) in attribute_dict
.items()
2651 elif attribute
== "metric":
2652 # Find next_hop for the route have LOWEST MED
2654 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2659 if addr_type
== "ipv4":
2660 command_1
= "show ip route"
2662 command_1
= "show ipv6 route"
2665 cmd
= "{} vrf {} json".format(command_1
, vrf
)
2667 cmd
= "{} json".format(command_1
)
2669 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2671 # Verifying output dictionary rib_routes_json is not empty
2672 if not bool(rib_routes_json
):
2673 errormsg
= "No route found in RIB of router {}..".format(router
)
2678 # Find best is installed in RIB
2679 if route
in rib_routes_json
:
2681 # Verify next_hop in rib_routes_json
2683 rib_routes_json
[route
][0]["nexthops"][0]["ip"]
2689 "Incorrect Nexthop for BGP route {} in "
2690 "RIB of router {}, Expected: {}, Found:"
2694 rib_routes_json
[route
][0]["nexthops"][0]["ip"],
2700 if st_found
and nh_found
:
2702 "Best path for prefix: %s with next_hop: %s is "
2703 "installed according to %s %s: (%s) in RIB of "
2709 attribute_dict
[_next_hop
],
2713 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2717 @retry(retry_timeout
=10)
2718 def verify_best_path_as_per_admin_distance(
2719 tgen
, addr_type
, router
, input_dict
, attribute
, expected
=True, vrf
=None
2722 API is to verify best path according to admin distance for given
2723 route. "show ip/ipv6 route json" command will be run and verify
2724 best path accoring to shortest admin distanc.
2728 * `addr_type` : ip type, ipv4/ipv6
2729 * `dut`: Device Under Test
2730 * `tgen` : topogen object
2731 * `attribute` : calculate best path using admin distance
2732 * `input_dict`: defines different routes with different admin distance
2733 to calculate for which route best path is selected
2734 * `expected` : expected results from API, by-default True
2735 * `vrf`: Pass vrf name check for perticular vrf.
2739 # To verify best path for route 200.50.2.0/32 from router r2 to
2740 router r1(DUT) as per shortest admin distance which is 60.
2743 "static_routes": [{"network": "200.50.2.0/32", \
2744 "admin_distance": 80, "next_hop": "10.0.0.14"},
2745 {"network": "200.50.2.0/32", \
2746 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2748 attribute = "locPrf"
2749 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2750 input_dict, attribute):
2753 errormsg(str) or True
2756 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2757 router_list
= tgen
.routers()
2758 if router
not in router_list
:
2761 rnode
= tgen
.routers()[router
]
2764 logger
.info("Verifying router %s RIB for best path:", router
)
2767 if addr_type
== "ipv4":
2768 command
= "show ip route"
2770 command
= "show ipv6 route"
2773 command
= "{} vrf {} json".format(command
, vrf
)
2775 command
= "{} json".format(command
)
2777 for routes_from_router
in input_dict
.keys():
2778 sh_ip_route_json
= router_list
[routes_from_router
].vtysh_cmd(
2779 command
, isjson
=True
2781 networks
= input_dict
[routes_from_router
]["static_routes"]
2782 for network
in networks
:
2783 route
= network
["network"]
2785 route_attributes
= sh_ip_route_json
[route
]
2789 for route_attribute
in route_attributes
:
2790 next_hops
= route_attribute
["nexthops"]
2791 for next_hop
in next_hops
:
2792 next_hop_ip
= next_hop
["ip"]
2793 attribute_dict
[next_hop_ip
] = route_attribute
["distance"]
2795 # Find next_hop for the route have LOWEST Admin Distance
2796 _next_hop
= min(attribute_dict
, key
=(lambda k
: attribute_dict
[k
]))
2800 rib_routes_json
= run_frr_cmd(rnode
, command
, isjson
=True)
2802 # Verifying output dictionary rib_routes_json is not empty
2803 if not bool(rib_routes_json
):
2804 errormsg
= "No route found in RIB of router {}..".format(router
)
2809 # Find best is installed in RIB
2810 if route
in rib_routes_json
:
2812 # Verify next_hop in rib_routes_json
2815 for nh
in rib_routes_json
[route
][0]["nexthops"]
2816 if nh
["ip"] == _next_hop
2821 "Nexthop {} is Missing for BGP route {}"
2822 " in RIB of router {}\n".format(_next_hop
, route
, router
)
2826 if st_found
and nh_found
:
2828 "Best path for prefix: %s is installed according"
2829 " to %s %s: (%s) in RIB of router %s",
2833 attribute_dict
[_next_hop
],
2837 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2841 @retry(retry_timeout
=30)
2853 This API is to verify whether bgp rib has any
2854 matching route for a nexthop.
2858 * `tgen`: topogen object
2859 * `dut`: input dut router name
2860 * `addr_type` : ip type ipv4/ipv6
2861 * `input_dict` : input dict, has details of static routes
2862 * `next_hop`[optional]: next_hop which needs to be verified,
2864 * 'aspath'[optional]: aspath which needs to be verified
2865 * `expected` : expected results from API, by-default True
2870 next_hop = "192.168.1.10"
2871 input_dict = topo['routers']
2872 aspath = "100 200 300"
2873 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2878 errormsg(str) or True
2881 logger
.debug("Entering lib API: verify_bgp_rib()")
2883 router_list
= tgen
.routers()
2884 additional_nexthops_in_required_nhs
= []
2888 for routerInput
in input_dict
.keys():
2889 for router
, rnode
in router_list
.items():
2893 # Verifying RIB routes
2894 command
= "show bgp"
2898 logger
.info("Checking router {} BGP RIB:".format(dut
))
2900 if "static_routes" in input_dict
[routerInput
]:
2901 static_routes
= input_dict
[routerInput
]["static_routes"]
2903 for static_route
in static_routes
:
2909 vrf
= static_route
.setdefault("vrf", None)
2910 community
= static_route
.setdefault("community", None)
2911 largeCommunity
= static_route
.setdefault("largeCommunity", None)
2914 cmd
= "{} vrf {} {}".format(command
, vrf
, addr_type
)
2917 cmd
= "{} community {}".format(cmd
, community
)
2920 cmd
= "{} large-community {}".format(cmd
, largeCommunity
)
2922 cmd
= "{} {}".format(command
, addr_type
)
2924 cmd
= "{} json".format(cmd
)
2926 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2928 # Verifying output dictionary rib_routes_json is not empty
2929 if bool(rib_routes_json
) == False:
2930 errormsg
= "No route found in rib of router {}..".format(router
)
2933 network
= static_route
["network"]
2935 if "no_of_ip" in static_route
:
2936 no_of_ip
= static_route
["no_of_ip"]
2940 # Generating IPs for verification
2941 ip_list
= generate_ips(network
, no_of_ip
)
2943 for st_rt
in ip_list
:
2944 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
2946 _addr_type
= validate_ip_address(st_rt
)
2947 if _addr_type
!= addr_type
:
2950 if st_rt
in rib_routes_json
["routes"]:
2952 found_routes
.append(st_rt
)
2954 if next_hop
and multi_nh
and st_found
:
2955 if type(next_hop
) is not list:
2956 next_hop
= [next_hop
]
2960 0, len(rib_routes_json
["routes"][st_rt
])
2965 for rib_r
in rib_routes_json
["routes"][
2970 for mnh
in found_hops
:
2971 for each_nh_in_multipath
in mnh
:
2972 list2
.append(each_nh_in_multipath
)
2974 missing_list_of_nexthops
= set(list2
).difference(
2977 additional_nexthops_in_required_nhs
= set(
2982 if additional_nexthops_in_required_nhs
:
2984 "Missing nexthop %s for route"
2985 " %s in RIB of router %s\n",
2986 additional_nexthops_in_required_nhs
,
2993 elif next_hop
and multi_nh
is None:
2994 if type(next_hop
) is not list:
2995 next_hop
= [next_hop
]
2999 for rib_r
in rib_routes_json
["routes"][st_rt
][0][
3004 missing_list_of_nexthops
= set(list2
).difference(list1
)
3005 additional_nexthops_in_required_nhs
= set(
3010 if additional_nexthops_in_required_nhs
:
3012 "Missing nexthop %s for route"
3013 " %s in RIB of router %s\n",
3014 additional_nexthops_in_required_nhs
,
3019 "Nexthop {} is Missing for "
3020 "route {} in RIB of router {}\n".format(
3021 additional_nexthops_in_required_nhs
,
3031 found_paths
= rib_routes_json
["routes"][st_rt
][0][
3034 if aspath
== found_paths
:
3037 "Found AS path {} for route"
3038 " {} in RIB of router "
3039 "{}\n".format(aspath
, st_rt
, dut
)
3043 "AS Path {} is missing for route"
3044 "for route {} in RIB of router {}\n".format(
3051 missing_routes
.append(st_rt
)
3055 "Found next_hop {} for all bgp"
3057 " router {}\n".format(next_hop
, router
)
3060 if len(missing_routes
) > 0:
3062 "Missing route in RIB of router {}, "
3063 "routes: {}\n".format(dut
, missing_routes
)
3069 "Verified routes in router {} BGP RIB, "
3070 "found routes are: {} \n".format(dut
, found_routes
)
3074 if "bgp" not in input_dict
[routerInput
]:
3077 # Advertise networks
3078 bgp_data_list
= input_dict
[routerInput
]["bgp"]
3080 if type(bgp_data_list
) is not list:
3081 bgp_data_list
= [bgp_data_list
]
3083 for bgp_data
in bgp_data_list
:
3084 vrf_id
= bgp_data
.setdefault("vrf", None)
3086 cmd
= "{} vrf {} {}".format(command
, vrf_id
, addr_type
)
3088 cmd
= "{} {}".format(command
, addr_type
)
3090 cmd
= "{} json".format(cmd
)
3092 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
3094 # Verifying output dictionary rib_routes_json is not empty
3095 if bool(rib_routes_json
) == False:
3096 errormsg
= "No route found in rib of router {}..".format(router
)
3099 bgp_net_advertise
= bgp_data
["address_family"][addr_type
]["unicast"]
3100 advertise_network
= bgp_net_advertise
.setdefault(
3101 "advertise_networks", []
3104 for advertise_network_dict
in advertise_network
:
3109 network
= advertise_network_dict
["network"]
3111 if "no_of_network" in advertise_network_dict
:
3112 no_of_network
= advertise_network_dict
["no_of_network"]
3116 # Generating IPs for verification
3117 ip_list
= generate_ips(network
, no_of_network
)
3119 for st_rt
in ip_list
:
3120 st_rt
= str(ipaddress
.ip_network(frr_unicode(st_rt
)))
3122 _addr_type
= validate_ip_address(st_rt
)
3123 if _addr_type
!= addr_type
:
3126 if st_rt
in rib_routes_json
["routes"]:
3128 found_routes
.append(st_rt
)
3131 missing_routes
.append(st_rt
)
3133 if len(missing_routes
) > 0:
3135 "Missing route in BGP RIB of router {},"
3136 " are: {}\n".format(dut
, missing_routes
)
3142 "Verified routes in router {} BGP RIB, found "
3143 "routes are: {}\n".format(dut
, found_routes
)
3146 logger
.debug("Exiting lib API: verify_bgp_rib()")
3150 @retry(retry_timeout
=10)
3151 def verify_graceful_restart(
3152 tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True
3155 This API is to verify verify_graceful_restart configuration of DUT and
3156 cross verify the same from the peer bgp routerrouter.
3160 * `tgen`: topogen object
3161 * `topo`: input json file data
3162 * `addr_type` : ip type ipv4/ipv6
3163 * `input_dict`: input dictionary, have details of Device Under Test, for
3164 which user wants to test the data
3165 * `dut`: input dut router name
3166 * `peer`: input peer router name
3167 * `expected` : expected results from API, by-default True
3180 "graceful-restart": True
3193 "graceful-restart": True
3205 result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
3206 dut = "r1", peer = 'r2')
3209 errormsg(str) or True
3212 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3214 for router
, rnode
in tgen
.routers().items():
3219 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3221 bgp_addr_type
= topo
["routers"][dut
]["bgp"][0]["address_family"]
3223 # bgp_addr_type = topo["routers"][dut]["bgp"]["address_family"]
3225 if addr_type
in bgp_addr_type
:
3226 if not check_address_types(addr_type
):
3229 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3231 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3232 if bgp_neighbor
!= peer
:
3235 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3236 data
= topo
["routers"][bgp_neighbor
]["links"]
3238 if dest_link
in data
:
3239 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3242 "[DUT: {}]: Checking bgp graceful-restart show"
3243 " o/p {}".format(dut
, neighbor_ip
)
3246 show_bgp_graceful_json
= None
3248 show_bgp_graceful_json
= run_frr_cmd(
3250 "show bgp {} neighbor {} graceful-restart json".format(
3251 addr_type
, neighbor_ip
3256 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3258 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3260 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3263 errormsg
= "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3271 if "address_family" in input_dict
[dut
]["bgp"]:
3272 bgp_neighbors
= input_dict
[dut
]["bgp"]["address_family"][addr_type
][
3274 ]["neighbor"][peer
]["dest_link"]
3276 for dest_link
, data
in bgp_neighbors
.items():
3278 "graceful-restart-helper" in data
3279 and data
["graceful-restart-helper"]
3282 elif "graceful-restart" in data
and data
["graceful-restart"]:
3285 "graceful-restart-disable" in data
3286 and data
["graceful-restart-disable"]
3293 if "graceful-restart" in input_dict
[dut
]["bgp"]:
3296 "graceful-restart" in input_dict
[dut
]["bgp"]["graceful-restart"]
3297 and input_dict
[dut
]["bgp"]["graceful-restart"][
3303 "graceful-restart-disable"
3304 in input_dict
[dut
]["bgp"]["graceful-restart"]
3305 and input_dict
[dut
]["bgp"]["graceful-restart"][
3306 "graceful-restart-disable"
3315 if lmode
== "Disable" or lmode
== "Disable*":
3319 if "address_family" in input_dict
[peer
]["bgp"]:
3320 bgp_neighbors
= input_dict
[peer
]["bgp"]["address_family"][addr_type
][
3322 ]["neighbor"][dut
]["dest_link"]
3324 for dest_link
, data
in bgp_neighbors
.items():
3326 "graceful-restart-helper" in data
3327 and data
["graceful-restart-helper"]
3330 elif "graceful-restart" in data
and data
["graceful-restart"]:
3333 "graceful-restart-disable" in data
3334 and data
["graceful-restart-disable"]
3341 if "graceful-restart" in input_dict
[peer
]["bgp"]:
3345 in input_dict
[peer
]["bgp"]["graceful-restart"]
3346 and input_dict
[peer
]["bgp"]["graceful-restart"][
3352 "graceful-restart-disable"
3353 in input_dict
[peer
]["bgp"]["graceful-restart"]
3354 and input_dict
[peer
]["bgp"]["graceful-restart"][
3355 "graceful-restart-disable"
3364 if show_bgp_graceful_json_out
["localGrMode"] == lmode
:
3366 "[DUT: {}]: localGrMode : {} ".format(
3367 dut
, show_bgp_graceful_json_out
["localGrMode"]
3372 "[DUT: {}]: localGrMode is not correct"
3373 " Expected: {}, Found: {}".format(
3374 dut
, lmode
, show_bgp_graceful_json_out
["localGrMode"]
3379 if show_bgp_graceful_json_out
["remoteGrMode"] == rmode
:
3381 "[DUT: {}]: remoteGrMode : {} ".format(
3382 dut
, show_bgp_graceful_json_out
["remoteGrMode"]
3386 show_bgp_graceful_json_out
["remoteGrMode"] == "NotApplicable"
3387 and rmode
== "Disable"
3390 "[DUT: {}]: remoteGrMode : {} ".format(
3391 dut
, show_bgp_graceful_json_out
["remoteGrMode"]
3396 "[DUT: {}]: remoteGrMode is not correct"
3397 " Expected: {}, Found: {}".format(
3398 dut
, rmode
, show_bgp_graceful_json_out
["remoteGrMode"]
3403 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3407 @retry(retry_timeout
=10)
3408 def verify_r_bit(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3410 This API is to verify r_bit in the BGP gr capability advertised
3411 by the neighbor router
3415 * `tgen`: topogen object
3416 * `topo`: input json file data
3417 * `addr_type` : ip type ipv4/ipv6
3418 * `input_dict`: input dictionary, have details of Device Under Test, for
3419 which user wants to test the data
3420 * `dut`: input dut router name
3422 * `expected` : expected results from API, by-default True
3436 "graceful-restart": True
3449 "graceful-restart": True
3460 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
3464 errormsg(str) or True
3467 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3469 for router
, rnode
in tgen
.routers().items():
3473 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3475 if addr_type
in bgp_addr_type
:
3476 if not check_address_types(addr_type
):
3479 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3481 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3482 if bgp_neighbor
!= peer
:
3485 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3486 data
= topo
["routers"][bgp_neighbor
]["links"]
3488 if dest_link
in data
:
3489 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3492 "[DUT: {}]: Checking bgp graceful-restart show"
3493 " o/p {}".format(dut
, neighbor_ip
)
3496 show_bgp_graceful_json
= run_frr_cmd(
3498 "show bgp {} neighbor {} graceful-restart json".format(
3499 addr_type
, neighbor_ip
3504 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3506 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3508 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3511 errormsg
= "[DUT: {}]: Neighbor ip NOT a matched {}".format(
3516 if "rBit" in show_bgp_graceful_json_out
:
3517 if show_bgp_graceful_json_out
["rBit"]:
3518 logger
.info("[DUT: {}]: Rbit true {}".format(dut
, neighbor_ip
))
3520 errormsg
= "[DUT: {}]: Rbit false {}".format(dut
, neighbor_ip
)
3523 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3527 @retry(retry_timeout
=10)
3528 def verify_eor(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3530 This API is to verify EOR
3534 * `tgen`: topogen object
3535 * `topo`: input json file data
3536 * `addr_type` : ip type ipv4/ipv6
3537 * `input_dict`: input dictionary, have details of DUT, for
3538 which user wants to test the data
3539 * `dut`: input dut router name
3554 "graceful-restart": True
3567 "graceful-restart": True
3579 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
3583 errormsg(str) or True
3585 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3587 for router
, rnode
in tgen
.routers().items():
3591 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3593 if addr_type
in bgp_addr_type
:
3594 if not check_address_types(addr_type
):
3597 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3599 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3600 if bgp_neighbor
!= peer
:
3603 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3604 data
= topo
["routers"][bgp_neighbor
]["links"]
3606 if dest_link
in data
:
3607 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3610 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
3615 show_bgp_graceful_json
= run_frr_cmd(
3617 "show bgp {} neighbor {} graceful-restart json".format(
3618 addr_type
, neighbor_ip
3623 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3625 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3626 logger
.info("[DUT: %s]: Neighbor ip matched %s", dut
, neighbor_ip
)
3628 errormsg
= "[DUT: %s]: Neighbor ip is NOT matched %s" % (
3634 if addr_type
== "ipv4":
3636 elif addr_type
== "ipv6":
3639 errormsg
= "Address type %s is not supported" % (addr_type
)
3642 eor_json
= show_bgp_graceful_json_out
[afi
]["endOfRibStatus"]
3643 if "endOfRibSend" in eor_json
:
3645 if eor_json
["endOfRibSend"]:
3647 "[DUT: %s]: EOR Send true for %s " "%s", dut
, neighbor_ip
, afi
3650 errormsg
= "[DUT: %s]: EOR Send false for %s" " %s" % (
3657 if "endOfRibRecv" in eor_json
:
3658 if eor_json
["endOfRibRecv"]:
3660 "[DUT: %s]: EOR Recv true %s " "%s", dut
, neighbor_ip
, afi
3663 errormsg
= "[DUT: %s]: EOR Recv false %s " "%s" % (
3670 if "endOfRibSentAfterUpdate" in eor_json
:
3671 if eor_json
["endOfRibSentAfterUpdate"]:
3673 "[DUT: %s]: EOR SendTime true for %s" " %s",
3679 errormsg
= "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3686 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3690 @retry(retry_timeout
=8)
3691 def verify_f_bit(tgen
, topo
, addr_type
, input_dict
, dut
, peer
, expected
=True):
3693 This API is to verify f_bit in the BGP gr capability advertised
3694 by the neighbor router
3698 * `tgen`: topogen object
3699 * `topo`: input json file data
3700 * `addr_type` : ip type ipv4/ipv6
3701 * `input_dict`: input dictionary, have details of Device Under Test, for
3702 which user wants to test the data
3703 * `dut`: input dut router name
3705 * `expected` : expected results from API, by-default True
3719 "graceful-restart": True
3732 "graceful-restart": True
3744 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3748 errormsg(str) or True
3751 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3753 for router
, rnode
in tgen
.routers().items():
3757 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3759 if addr_type
in bgp_addr_type
:
3760 if not check_address_types(addr_type
):
3763 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3765 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3766 if bgp_neighbor
!= peer
:
3769 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3770 data
= topo
["routers"][bgp_neighbor
]["links"]
3772 if dest_link
in data
:
3773 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3776 "[DUT: {}]: Checking bgp graceful-restart show"
3777 " o/p {}".format(dut
, neighbor_ip
)
3780 show_bgp_graceful_json
= run_frr_cmd(
3782 "show bgp {} neighbor {} graceful-restart json".format(
3783 addr_type
, neighbor_ip
3788 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3790 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3792 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3795 errormsg
= "[DUT: {}]: Neighbor ip NOT a match {}".format(
3800 if "ipv4Unicast" in show_bgp_graceful_json_out
:
3801 if show_bgp_graceful_json_out
["ipv4Unicast"]["fBit"]:
3803 "[DUT: {}]: Fbit True for {} IPv4"
3804 " Unicast".format(dut
, neighbor_ip
)
3807 errormsg
= "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3812 elif "ipv6Unicast" in show_bgp_graceful_json_out
:
3813 if show_bgp_graceful_json_out
["ipv6Unicast"]["fBit"]:
3815 "[DUT: {}]: Fbit True for {} IPv6"
3816 " Unicast".format(dut
, neighbor_ip
)
3819 errormsg
= "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3824 show_bgp_graceful_json_out
["ipv4Unicast"]
3825 show_bgp_graceful_json_out
["ipv6Unicast"]
3827 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3831 @retry(retry_timeout
=10)
3832 def verify_graceful_restart_timers(tgen
, topo
, addr_type
, input_dict
, dut
, peer
):
3834 This API is to verify graceful restart timers, configured and received
3838 * `tgen`: topogen object
3839 * `topo`: input json file data
3840 * `addr_type` : ip type ipv4/ipv6
3841 * `input_dict`: input dictionary, have details of Device Under Test,
3842 for which user wants to test the data
3843 * `dut`: input dut router name
3845 * `expected` : expected results from API, by-default True
3849 # Configure graceful-restart
3855 "graceful-restart": "graceful-restart-helper"
3858 "gracefulrestart": ["restart-time 150"]
3865 "graceful-restart": "graceful-restart"
3872 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3876 errormsg(str) or True
3879 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3881 for router
, rnode
in tgen
.routers().items():
3885 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3887 if addr_type
in bgp_addr_type
:
3888 if not check_address_types(addr_type
):
3891 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3893 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3894 if bgp_neighbor
!= peer
:
3897 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3898 data
= topo
["routers"][bgp_neighbor
]["links"]
3900 if dest_link
in data
:
3901 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3904 "[DUT: {}]: Checking bgp graceful-restart show"
3905 " o/p {}".format(dut
, neighbor_ip
)
3908 show_bgp_graceful_json
= run_frr_cmd(
3910 "show bgp {} neighbor {} graceful-restart json".format(
3911 addr_type
, neighbor_ip
3916 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3917 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3919 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3922 errormsg
= "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3927 # Graceful-restart timer
3928 if "graceful-restart" in input_dict
[peer
]["bgp"]:
3929 if "timer" in input_dict
[peer
]["bgp"]["graceful-restart"]:
3930 for rs_timer
, value
in input_dict
[peer
]["bgp"]["graceful-restart"][
3933 if rs_timer
== "restart-time":
3935 receivedTimer
= value
3937 show_bgp_graceful_json_out
["timers"][
3938 "receivedRestartTimer"
3943 "receivedRestartTimer is {}"
3944 " on {} from peer {}".format(
3945 receivedTimer
, router
, peer
3950 "receivedRestartTimer is not"
3951 " as expected {}".format(receivedTimer
)
3955 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3959 @retry(retry_timeout
=8)
3960 def verify_gr_address_family(
3961 tgen
, topo
, addr_type
, addr_family
, dut
, peer
, expected
=True
3964 This API is to verify gr_address_family in the BGP gr capability advertised
3965 by the neighbor router
3969 * `tgen`: topogen object
3970 * `topo`: input json file data
3971 * `addr_type` : ip type ipv4/ipv6
3972 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3973 * `dut`: input dut router name
3974 * `peer`: input peer router to check
3975 * `expected` : expected results from API, by-default True
3980 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1", "r3")
3984 errormsg(str) or None
3987 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3989 if not check_address_types(addr_type
):
3990 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3993 routers
= tgen
.routers()
3994 if dut
not in routers
:
3995 return "{} not in routers".format(dut
)
3997 rnode
= routers
[dut
]
3998 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
4000 if addr_type
not in bgp_addr_type
:
4001 return "{} not in bgp_addr_types".format(addr_type
)
4003 if peer
not in bgp_addr_type
[addr_type
]["unicast"]["neighbor"]:
4004 return "{} not a peer of {} over {}".format(peer
, dut
, addr_type
)
4006 nbr_links
= topo
["routers"][peer
]["links"]
4007 if dut
not in nbr_links
or addr_type
not in nbr_links
[dut
]:
4008 return "peer {} missing back link to {} over {}".format(peer
, dut
, addr_type
)
4010 neighbor_ip
= nbr_links
[dut
][addr_type
].split("/")[0]
4013 "[DUT: {}]: Checking bgp graceful-restart show o/p {} for {}".format(
4014 dut
, neighbor_ip
, addr_family
4018 show_bgp_graceful_json
= run_frr_cmd(
4020 "show bgp {} neighbor {} graceful-restart json".format(addr_type
, neighbor_ip
),
4024 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
4026 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
4027 logger
.info("Neighbor ip matched {}".format(neighbor_ip
))
4029 errormsg
= "Neighbor ip NOT a match {}".format(neighbor_ip
)
4032 if addr_family
== "ipv4Unicast":
4033 if "ipv4Unicast" in show_bgp_graceful_json_out
:
4034 logger
.info("ipv4Unicast present for {} ".format(neighbor_ip
))
4037 errormsg
= "ipv4Unicast NOT present for {} ".format(neighbor_ip
)
4040 elif addr_family
== "ipv6Unicast":
4041 if "ipv6Unicast" in show_bgp_graceful_json_out
:
4042 logger
.info("ipv6Unicast present for {} ".format(neighbor_ip
))
4045 errormsg
= "ipv6Unicast NOT present for {} ".format(neighbor_ip
)
4048 errormsg
= "Aaddress family: {} present for {} ".format(
4049 addr_family
, neighbor_ip
4053 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4056 @retry(retry_timeout
=12)
4057 def verify_attributes_for_evpn_routes(
4071 API to verify rd and rt value using "sh bgp l2vpn evpn 10.1.1.1"
4075 * `tgen`: topogen object
4076 * `topo` : json file data
4077 * `dut` : device under test
4078 * `input_dict`: having details like - for which route, rd value
4079 needs to be verified
4080 * `rd` : route distinguisher
4081 * `rt` : route target
4082 * `ethTag` : Ethernet Tag
4083 * `ipLen` : IP prefix length
4084 * `rd_peer` : Peer name from which RD will be auto-generated
4085 * `rt_peer` : Peer name from which RT will be auto-generated
4086 * `expected` : expected results from API, by-default True
4093 "network": [NETWORK1_1[addr_type]],
4094 "next_hop": NEXT_HOP_IP[addr_type],
4100 result = verify_attributes_for_evpn_routes(tgen, topo,
4101 input_dict, rd = "10.0.0.33:1")
4105 errormsg(str) or True
4108 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4109 for router
in input_dict
.keys():
4110 rnode
= tgen
.routers()[dut
]
4112 if "static_routes" in input_dict
[router
]:
4113 for static_route
in input_dict
[router
]["static_routes"]:
4114 network
= static_route
["network"]
4116 if "vrf" in static_route
:
4117 vrf
= static_route
["vrf"]
4119 if type(network
) is not list:
4122 for route
in network
:
4123 route
= route
.split("/")[0]
4124 _addr_type
= validate_ip_address(route
)
4125 if "v4" in _addr_type
:
4127 elif "v6" in _addr_type
:
4130 cmd
= "show bgp l2vpn evpn {} json".format(route
)
4131 evpn_rd_value_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4132 if not bool(evpn_rd_value_json
):
4133 errormsg
= "No output for '{}' cli".format(cmd
)
4136 if rd
is not None and rd
!= "auto":
4138 "[DUT: %s]: Verifying rd value for " "evpn route %s:",
4143 if rd
in evpn_rd_value_json
:
4144 rd_value_json
= evpn_rd_value_json
[rd
]
4145 if rd_value_json
["rd"] != rd
:
4147 "[DUT: %s] Failed: Verifying"
4148 " RD value for EVPN route: %s"
4149 "[FAILED]!!, EXPECTED : %s "
4151 % (dut
, route
, rd
, rd_value_json
["rd"])
4157 "[DUT %s]: Verifying RD value for"
4158 " EVPN route: %s [PASSED]|| "
4159 "Found Expected: %s",
4168 "[DUT: %s] RD : %s is not present"
4169 " in cli json output" % (dut
, rd
)
4175 "[DUT: %s]: Verifying auto-rd value for " "evpn route %s:",
4184 rnode
= tgen
.routers()[rd_peer
]
4185 vrfs
= topo
["routers"][rd_peer
]["vrfs"]
4186 for vrf_dict
in vrfs
:
4187 vni_dict
[vrf_dict
["name"]] = index
4190 show_bgp_json
= run_frr_cmd(
4191 rnode
, "show bgp vrf all summary json", isjson
=True
4194 # Verifying output dictionary show_bgp_json is empty
4195 if not bool(show_bgp_json
):
4196 errormsg
= "BGP is not running"
4199 show_bgp_json_vrf
= show_bgp_json
[vrf
]
4200 for afi
, afi_data
in show_bgp_json_vrf
.items():
4201 if input_afi
not in afi
:
4203 router_id
= afi_data
["routerId"]
4206 rd
= "{}:{}".format(router_id
, vni_dict
[vrf
])
4207 for _rd
, rd_value_json
in evpn_rd_value_json
.items():
4209 str(rd_value_json
["rd"].split(":")[0])
4214 if int(rd_value_json
["rd"].split(":")[1]) > 0:
4219 "[DUT %s]: Verifying RD value for"
4221 "Found Expected: %s",
4224 rd_value_json
["rd"],
4229 "[DUT: %s] Failed: Verifying"
4230 " RD value for EVPN route: %s"
4231 " FOUND : %s" % (dut
, route
, rd_value_json
["rd"])
4238 "[DUT: %s]: Verifying auto-rt value for " "evpn route %s:",
4244 rnode
= tgen
.routers()[rt_peer
]
4245 show_bgp_json
= run_frr_cmd(
4246 rnode
, "show bgp vrf all summary json", isjson
=True
4249 # Verifying output dictionary show_bgp_json is empty
4250 if not bool(show_bgp_json
):
4251 errormsg
= "BGP is not running"
4254 show_bgp_json_vrf
= show_bgp_json
[vrf
]
4255 for afi
, afi_data
in show_bgp_json_vrf
.items():
4256 if input_afi
not in afi
:
4258 as_num
= afi_data
["as"]
4260 show_vrf_vni_json
= run_frr_cmd(
4261 rnode
, "show vrf vni json", isjson
=True
4264 vrfs
= show_vrf_vni_json
["vrfs"]
4265 for vrf_dict
in vrfs
:
4266 if vrf_dict
["vrf"] == vrf
:
4267 vni_dict
[vrf_dict
["vrf"]] = str(vrf_dict
["vni"])
4269 # If AS is 4 byte, FRR uses only the lower 2 bytes of ASN+VNI
4270 # for auto derived RT value.
4272 as_bin
= bin(as_num
)
4273 as_bin
= as_bin
[-16:]
4274 as_num
= int(as_bin
, 2)
4276 rt
= "{}:{}".format(str(as_num
), vni_dict
[vrf
])
4277 for _rd
, route_data
in evpn_rd_value_json
.items():
4278 if route_data
["ip"] == route
:
4279 for rt_data
in route_data
["paths"]:
4280 if vni_dict
[vrf
] == rt_data
["vni"]:
4281 rt_string
= rt_data
["extendedCommunity"][
4284 rt_input
= "RT:{}".format(rt
)
4285 if rt_input
not in rt_string
:
4294 % (dut
, route
, rt_input
, rt_string
)
4300 "[DUT %s]: Verifying "
4301 "RT value for EVPN "
4302 "route: %s [PASSED]||"
4303 "Found Expected: %s",
4312 "[DUT: %s] Route : %s is not"
4313 " present in cli json output" % (dut
, route
)
4317 if rt
is not None and rt
!= "auto":
4319 "[DUT: %s]: Verifying rt value for " "evpn route %s:",
4324 if type(rt
) is not list:
4328 for _rd
, route_data
in evpn_rd_value_json
.items():
4329 if route_data
["ip"] == route
:
4330 for rt_data
in route_data
["paths"]:
4331 rt_string
= rt_data
["extendedCommunity"][
4334 rt_input
= "RT:{}".format(_rt
)
4335 if rt_input
not in rt_string
:
4337 "[DUT: %s] Failed: "
4338 "Verifying RT value "
4339 "for EVPN route: %s"
4343 % (dut
, route
, rt_input
, rt_string
)
4349 "[DUT %s]: Verifying RT"
4350 " value for EVPN route:"
4352 "Found Expected: %s",
4361 "[DUT: %s] Route : %s is not"
4362 " present in cli json output" % (dut
, route
)
4366 if ethTag
is not None:
4368 "[DUT: %s]: Verifying ethTag value for " "evpn route :", dut
4371 for _rd
, route_data
in evpn_rd_value_json
.items():
4372 if route_data
["ip"] == route
:
4373 if route_data
["ethTag"] != ethTag
:
4375 "[DUT: %s] RD: %s, Failed: "
4376 "Verifying ethTag value "
4377 "for EVPN route: %s"
4386 route_data
["ethTag"],
4393 "[DUT %s]: RD: %s, Verifying "
4394 "ethTag value for EVPN route:"
4396 "Found Expected: %s",
4406 "[DUT: %s] RD: %s, Route : %s "
4407 "is not present in cli json "
4408 "output" % (dut
, _rd
, route
)
4412 if ipLen
is not None:
4414 "[DUT: %s]: Verifying ipLen value for " "evpn route :", dut
4417 for _rd
, route_data
in evpn_rd_value_json
.items():
4418 if route_data
["ip"] == route
:
4419 if route_data
["ipLen"] != int(ipLen
):
4421 "[DUT: %s] RD: %s, Failed: "
4422 "Verifying ipLen value "
4423 "for EVPN route: %s"
4427 % (dut
, _rd
, route
, ipLen
, route_data
["ipLen"])
4433 "[DUT %s]: RD: %s, Verifying "
4434 "ipLen value for EVPN route:"
4436 "Found Expected: %s",
4446 "[DUT: %s] RD: %s, Route : %s "
4447 "is not present in cli json "
4448 "output " % (dut
, _rd
, route
)
4452 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4456 @retry(retry_timeout
=10)
4457 def verify_evpn_routes(
4458 tgen
, topo
, dut
, input_dict
, routeType
=5, EthTag
=0, next_hop
=None, expected
=True
4461 API to verify evpn routes using "sh bgp l2vpn evpn"
4465 * `tgen`: topogen object
4466 * `topo` : json file data
4467 * `dut` : device under test
4468 * `input_dict`: having details like - for which route, rd value
4469 needs to be verified
4470 * `route_type` : Route type 5 is supported as of now
4471 * `EthTag` : Ethernet tag, by-default is 0
4472 * `next_hop` : Prefered nexthop for the evpn routes
4473 * `expected` : expected results from API, by-default True
4480 "network": [NETWORK1_1[addr_type]],
4481 "next_hop": NEXT_HOP_IP[addr_type],
4486 result = verify_evpn_routes(tgen, topo, input_dict)
4489 errormsg(str) or True
4492 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4494 for router
in input_dict
.keys():
4495 rnode
= tgen
.routers()[dut
]
4497 logger
.info("[DUT: %s]: Verifying evpn routes: ", dut
)
4499 if "static_routes" in input_dict
[router
]:
4500 for static_route
in input_dict
[router
]["static_routes"]:
4501 network
= static_route
["network"]
4503 if type(network
) is not list:
4507 for route
in network
:
4509 ip_len
= route
.split("/")[1]
4510 route
= route
.split("/")[0]
4512 prefix
= "[{}]:[{}]:[{}]:[{}]".format(
4513 routeType
, EthTag
, ip_len
, route
4516 cmd
= "show bgp l2vpn evpn route json"
4517 evpn_value_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4519 if not bool(evpn_value_json
):
4520 errormsg
= "No output for '{}' cli".format(cmd
)
4523 if evpn_value_json
["numPrefix"] == 0:
4524 errormsg
= "[DUT: %s]: No EVPN prefixes exist" % (dut
)
4527 for key
, route_data_json
in evpn_value_json
.items():
4528 if type(route_data_json
) is dict:
4530 if prefix
not in route_data_json
:
4531 missing_routes
[key
] = prefix
4533 if rd_keys
== len(missing_routes
.keys()):
4536 "Missing EVPN routes: "
4537 "%s [FAILED]!!" % (dut
, list(set(missing_routes
.values())))
4541 for key
, route_data_json
in evpn_value_json
.items():
4542 if type(route_data_json
) is dict:
4543 if prefix
not in route_data_json
:
4546 for paths
in route_data_json
[prefix
]["paths"]:
4548 if path
["routeType"] != routeType
:
4551 "Verifying routeType "
4552 "for EVPN route: %s "
4566 for nh_dict
in path
["nexthops"]:
4567 if nh_dict
["ip"] != next_hop
:
4587 "[DUT %s]: Verifying "
4598 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4603 @retry(retry_timeout
=10)
4604 def verify_bgp_bestpath(tgen
, addr_type
, input_dict
):
4606 Verifies bgp next hop values in best-path output
4608 * `dut` : device under test
4609 * `addr_type` : Address type ipv4/ipv6
4610 * `input_dict`: having details like multipath and bestpath
4617 "bestpath": "50.0.0.1",
4618 "multipath": ["50.0.0.1", "50.0.0.2"],
4619 "network": "100.0.0.0/24"
4622 "bestpath": "1000::1",
4623 "multipath": ["1000::1", "1000::2"]
4624 "network": "2000::1/128"
4629 result = verify_bgp_bestpath(tgen, input_dict)
4634 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4635 for dut
in input_dict
.keys():
4636 rnode
= tgen
.routers()[dut
]
4638 logger
.info("[DUT: %s]: Verifying bgp bestpath and multipath " "routes:", dut
)
4640 for network_dict
in input_dict
[dut
][addr_type
]:
4641 nw_addr
= network_dict
.setdefault("network", None)
4642 vrf
= network_dict
.setdefault("vrf", None)
4643 bestpath
= network_dict
.setdefault("bestpath", None)
4646 cmd
= "show bgp vrf {} {} {} bestpath json".format(
4647 vrf
, addr_type
, nw_addr
4650 cmd
= "show bgp {} {} bestpath json".format(addr_type
, nw_addr
)
4652 data
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4653 route
= data
["paths"][0]
4655 if "bestpath" in route
:
4656 if route
["bestpath"]["overall"] is True:
4657 _bestpath
= route
["nexthops"][0]["ip"]
4659 if _bestpath
!= bestpath
:
4661 "DUT:[{}] Bestpath do not match for"
4662 " network: {}, Expected "
4663 " {} as bgp bestpath found {}".format(
4664 dut
, nw_addr
, bestpath
, _bestpath
4669 "DUT:[{}] Found expected bestpath: "
4670 " {} for network: {}".format(dut
, _bestpath
, nw_addr
)
4674 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4678 def verify_tcp_mss(tgen
, dut
, neighbour
, configured_tcp_mss
, vrf
=None):
4680 This api is used to verify the tcp-mss value assigned to a neigbour of DUT
4684 * `tgen` : topogen object
4685 * `dut`: device under test
4686 * `neighbour`:neigbout IP address
4687 * `configured_tcp_mss`:The TCP-MSS value to be verified
4692 result = verify_tcp_mss(tgen, dut,neighbour,configured_tcp_mss)
4695 errormsg(str) or True
4698 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4699 rnode
= tgen
.routers()[dut
]
4701 cmd
= "show bgp vrf {} neighbors {} json".format(vrf
, neighbour
)
4703 cmd
= "show bgp neighbors {} json".format(neighbour
)
4705 # Execute the command
4706 show_vrf_stats
= run_frr_cmd(rnode
, cmd
, isjson
=True)
4708 # Verify TCP-MSS on router
4709 logger
.info("Verify that no core is observed")
4710 if tgen
.routers_have_failure():
4711 errormsg
= "Core observed while running CLI: %s" % (cmd
)
4714 if configured_tcp_mss
== show_vrf_stats
.get(neighbour
).get(
4715 "bgpTcpMssConfigured"
4718 "Configured TCP - MSS Found: {}".format(sys
._getframe
().f_code
.co_name
)
4723 "TCP-MSS Mismatch ,configured {} expecting {}".format(
4724 show_vrf_stats
.get(neighbour
).get("bgpTcpMssConfigured"),
4728 return "TCP-MSS Mismatch"
4729 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4733 def get_dut_as_number(tgen
, dut
):
4735 API to get the Autonomous Number of the given DUT
4739 dut : Device Under test
4743 Success : DUT Autonomous number
4744 Fail : Error message with Boolean False
4746 tgen
= get_topogen()
4747 for router
, rnode
in tgen
.routers().items():
4749 show_bgp_json
= run_frr_cmd(rnode
, "sh ip bgp summary json ", isjson
=True)
4750 as_number
= show_bgp_json
["ipv4Unicast"]["as"]
4753 "[dut {}] DUT contains Automnomous number :: {} ".format(
4760 "[dut {}] ERROR....! DUT doesnot contain any Automnomous number ".format(
4767 def get_prefix_count_route(
4768 tgen
, topo
, dut
, peer
, vrf
=None, link
=None, sent
=None, received
=None
4771 API to return the prefix count of default originate the given DUT
4772 dut : Device under test
4773 peer : neigbor on which you are expecting the route to be received
4776 prefix_count as dict with ipv4 and ipv6 value
4778 # the neighbor IP address can be accessable by finding the neigborship (vice-versa)
4781 neighbor_ipv4_address
= topo
["routers"][peer
]["links"][link
]["ipv4"]
4782 neighbor_ipv6_address
= topo
["routers"][peer
]["links"][link
]["ipv6"]
4784 neighbor_ipv4_address
= topo
["routers"][peer
]["links"][dut
]["ipv4"]
4785 neighbor_ipv6_address
= topo
["routers"][peer
]["links"][dut
]["ipv6"]
4787 neighbor_ipv4_address
= neighbor_ipv4_address
.split("/")[0]
4788 neighbor_ipv6_address
= neighbor_ipv6_address
.split("/")[0]
4790 tgen
= get_topogen()
4791 for router
, rnode
in tgen
.routers().items():
4795 ipv4_cmd
= "sh ip bgp vrf {} summary json".format(vrf
)
4796 show_bgp_json_ipv4
= run_frr_cmd(rnode
, ipv4_cmd
, isjson
=True)
4797 ipv6_cmd
= "sh ip bgp vrf {} ipv6 unicast summary json".format(vrf
)
4798 show_bgp_json_ipv6
= run_frr_cmd(rnode
, ipv6_cmd
, isjson
=True)
4800 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"]["peers"][
4801 neighbor_ipv4_address
4803 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4804 neighbor_ipv6_address
4808 "The Prefix Count of the [DUT:{} : vrf [{}] ] towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4811 neighbor_ipv4_address
,
4812 neighbor_ipv6_address
,
4819 show_bgp_json_ipv4
= run_frr_cmd(
4820 rnode
, "sh ip bgp summary json ", isjson
=True
4822 show_bgp_json_ipv6
= run_frr_cmd(
4823 rnode
, "sh ip bgp ipv6 unicast summary json ", isjson
=True
4826 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4828 ][neighbor_ipv4_address
]["pfxRcd"]
4829 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4830 neighbor_ipv6_address
4834 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4836 ][neighbor_ipv4_address
]["pfxSnt"]
4837 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4838 neighbor_ipv6_address
4842 prefix_count
["ipv4_count"] = show_bgp_json_ipv4
["ipv4Unicast"][
4844 ][neighbor_ipv4_address
]["pfxRcd"]
4845 prefix_count
["ipv6_count"] = show_bgp_json_ipv6
["peers"][
4846 neighbor_ipv6_address
4850 "The Prefix Count of the DUT:{} towards neighbor ipv4 : {} and ipv6 : {} is : {}".format(
4851 dut
, neighbor_ipv4_address
, neighbor_ipv6_address
, prefix_count
4856 logger
.error("ERROR...! Unknown dut {} in topolgy".format(dut
))
4859 @retry(retry_timeout
=5)
4860 def verify_rib_default_route(
4869 expected_aspath
=None,
4872 API to verify the the 'Default route" in BGP RIB with the attributes the rout carries (metric , local preference, )
4876 dut : device under test
4877 routes : default route with expected nexthop
4878 expected_nexthop : the nexthop that is expected the deafult route
4882 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
4883 tgen
= get_topogen()
4884 connected_routes
= {}
4885 for router
, rnode
in tgen
.routers().items():
4888 ipv4_routes
= run_frr_cmd(rnode
, "sh ip bgp json", isjson
=True)
4889 ipv6_routes
= run_frr_cmd(rnode
, "sh ip bgp ipv6 unicast json", isjson
=True)
4890 is_ipv4_default_attrib_found
= False
4891 is_ipv6_default_attrib_found
= False
4893 default_ipv4_route
= routes
["ipv4"]
4894 default_ipv6_route
= "::/0"
4895 ipv4_route_Origin
= False
4896 ipv4_route_local_pref
= False
4897 ipv4_route_metric
= False
4899 if default_ipv4_route
in ipv4_routes
["routes"].keys():
4900 nxt_hop_count
= len(ipv4_routes
["routes"][default_ipv4_route
])
4902 for index
in range(nxt_hop_count
):
4903 rib_next_hops
.append(
4904 ipv4_routes
["routes"][default_ipv4_route
][index
]["nexthops"][0]["ip"]
4907 for nxt_hop
in expected_nexthop
.items():
4908 if nxt_hop
[0] == "ipv4":
4909 if nxt_hop
[1] in rib_next_hops
:
4911 "Default routes [{}] obtained from {} .....PASSED".format(
4912 default_ipv4_route
, nxt_hop
[1]
4917 "ERROR ...! Default routes [{}] expected is missing {}".format(
4918 default_ipv4_route
, nxt_hop
[1]
4926 if "origin" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4927 ipv4_route_Origin
= ipv4_routes
["routes"][default_ipv4_route
][0]["origin"]
4928 if "locPrf" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4929 ipv4_route_local_pref
= ipv4_routes
["routes"][default_ipv4_route
][0][
4932 if "metric" in ipv4_routes
["routes"][default_ipv4_route
][0].keys():
4933 ipv4_route_metric
= ipv4_routes
["routes"][default_ipv4_route
][0]["metric"]
4935 logger
.error("ERROR [ DUT {}] : The Default Route Not found in RIB".format(dut
))
4938 origin_found
= False
4939 locPrf_found
= False
4940 metric_found
= False
4941 as_path_found
= False
4944 if origin
== ipv4_route_Origin
:
4946 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
4947 default_ipv4_route
, origin
4953 "ERROR... IPV4::! Expected Origin is {} obtained {}".format(
4954 origin
, ipv4_route_Origin
4962 if locPrf
== ipv4_route_local_pref
:
4964 "Dafault Route {} expected local preference {} Found in RIB....PASSED".format(
4965 default_ipv4_route
, locPrf
4971 "ERROR... IPV4::! Expected Local preference is {} obtained {}".format(
4972 locPrf
, ipv4_route_local_pref
4980 if metric
== ipv4_route_metric
:
4982 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
4983 default_ipv4_route
, metric
4990 "ERROR... IPV4::! Expected metric is {} obtained {}".format(
4991 metric
, ipv4_route_metric
4999 obtained_aspath
= ipv4_routes
["routes"]["0.0.0.0/0"][0]["path"]
5000 if expected_aspath
in obtained_aspath
:
5001 as_path_found
= True
5003 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
5004 default_ipv4_route
, expected_aspath
5009 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
5010 expected_aspath
, obtained_aspath
5015 as_path_found
= True
5017 if origin_found
and locPrf_found
and metric_found
and as_path_found
:
5018 is_ipv4_default_attrib_found
= True
5020 "IPV4:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5021 origin
, locPrf
, metric
, expected_aspath
5025 is_ipv4_default_attrib_found
= False
5027 "IPV4:: Expected origin ['{}'] Obtained [{}]".format(
5028 origin
, ipv4_route_Origin
5032 "IPV4:: Expected locPrf ['{}'] Obtained [{}]".format(
5033 locPrf
, ipv4_route_local_pref
5037 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5038 metric
, ipv4_route_metric
5042 "IPV4:: Expected metric ['{}'] Obtained [{}]".format(
5043 expected_aspath
, obtained_aspath
5047 route_Origin
= False
5048 route_local_pref
= False
5049 route_local_metric
= False
5050 default_ipv6_route
= ""
5052 ipv6_routes
["routes"]["0::0/0"]
5053 default_ipv6_route
= "0::0/0"
5055 ipv6_routes
["routes"]["::/0"]
5056 default_ipv6_route
= "::/0"
5057 if default_ipv6_route
in ipv6_routes
["routes"].keys():
5058 nxt_hop_count
= len(ipv6_routes
["routes"][default_ipv6_route
])
5060 for index
in range(nxt_hop_count
):
5061 rib_next_hops
.append(
5062 ipv6_routes
["routes"][default_ipv6_route
][index
]["nexthops"][0]["ip"]
5065 rib_next_hops
.append(
5066 ipv6_routes
["routes"][default_ipv6_route
][index
]["nexthops"][1][
5070 except (KeyError, IndexError) as e
:
5071 logger
.error("NO impact ..! Global IPV6 Address not found ")
5073 for nxt_hop
in expected_nexthop
.items():
5074 if nxt_hop
[0] == "ipv6":
5075 if nxt_hop
[1] in rib_next_hops
:
5077 "Default routes [{}] obtained from {} .....PASSED".format(
5078 default_ipv6_route
, nxt_hop
[1]
5083 "ERROR ...! Default routes [{}] expected from {} obtained {}".format(
5084 default_ipv6_route
, nxt_hop
[1], rib_next_hops
5091 if "origin" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5092 route_Origin
= ipv6_routes
["routes"][default_ipv6_route
][0]["origin"]
5093 if "locPrf" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5094 route_local_pref
= ipv6_routes
["routes"][default_ipv6_route
][0]["locPrf"]
5095 if "metric" in ipv6_routes
["routes"][default_ipv6_route
][0].keys():
5096 route_local_metric
= ipv6_routes
["routes"][default_ipv6_route
][0]["metric"]
5098 origin_found
= False
5099 locPrf_found
= False
5100 metric_found
= False
5101 as_path_found
= False
5104 if origin
== route_Origin
:
5106 "Dafault Route {} expected origin {} Found in RIB....PASSED".format(
5107 default_ipv6_route
, route_Origin
5113 "ERROR... IPV6::! Expected Origin is {} obtained {}".format(
5114 origin
, route_Origin
5122 if locPrf
== route_local_pref
:
5124 "Dafault Route {} expected Local Preference {} Found in RIB....PASSED".format(
5125 default_ipv6_route
, route_local_pref
5131 "ERROR... IPV6::! Expected Local Preference is {} obtained {}".format(
5132 locPrf
, route_local_pref
5140 if metric
== route_local_metric
:
5142 "Dafault Route {} expected metric {} Found in RIB....PASSED".format(
5143 default_ipv4_route
, metric
5150 "ERROR... IPV6::! Expected metric is {} obtained {}".format(
5151 metric
, route_local_metric
5159 obtained_aspath
= ipv6_routes
["routes"]["::/0"][0]["path"]
5160 if expected_aspath
in obtained_aspath
:
5161 as_path_found
= True
5163 "Dafault Route {} expected AS path {} Found in RIB....PASSED".format(
5164 default_ipv4_route
, expected_aspath
5169 "ERROR.....! Expected AS path {} obtained {}..... FAILED ".format(
5170 expected_aspath
, obtained_aspath
5175 as_path_found
= True
5177 if origin_found
and locPrf_found
and metric_found
and as_path_found
:
5178 is_ipv6_default_attrib_found
= True
5180 "IPV6:: Expected origin ['{}'] , Local Preference ['{}'] , Metric ['{}'] and AS path [{}] is found in RIB".format(
5181 origin
, locPrf
, metric
, expected_aspath
5185 is_ipv6_default_attrib_found
= False
5187 "IPV6:: Expected origin ['{}'] Obtained [{}]".format(origin
, route_Origin
)
5190 "IPV6:: Expected locPrf ['{}'] Obtained [{}]".format(
5191 locPrf
, route_local_pref
5195 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5196 metric
, route_local_metric
5200 "IPV6:: Expected metric ['{}'] Obtained [{}]".format(
5201 expected_aspath
, obtained_aspath
5205 if is_ipv4_default_attrib_found
and is_ipv6_default_attrib_found
:
5206 logger
.info("The attributes are found for default route in RIB ")
5212 @retry(retry_timeout
=5)
5213 def verify_fib_default_route(tgen
, topo
, dut
, routes
, expected_nexthop
):
5215 API to verify the the 'Default route" in FIB
5219 dut : device under test
5220 routes : default route with expected nexthop
5221 expected_nexthop : the nexthop that is expected the deafult route
5225 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5226 tgen
= get_topogen()
5227 connected_routes
= {}
5228 for router
, rnode
in tgen
.routers().items():
5230 ipv4_routes
= run_frr_cmd(rnode
, "sh ip route json", isjson
=True)
5231 ipv6_routes
= run_frr_cmd(rnode
, "sh ipv6 route json", isjson
=True)
5233 is_ipv4_default_route_found
= False
5234 is_ipv6_default_route_found
= False
5235 if routes
["ipv4"] in ipv4_routes
.keys():
5236 rib_ipv4_nxt_hops
= []
5237 ipv4_default_route
= routes
["ipv4"]
5238 nxt_hop_count
= len(ipv4_routes
[ipv4_default_route
][0]["nexthops"])
5239 for index
in range(nxt_hop_count
):
5240 rib_ipv4_nxt_hops
.append(
5241 ipv4_routes
[ipv4_default_route
][0]["nexthops"][index
]["ip"]
5244 if expected_nexthop
["ipv4"] in rib_ipv4_nxt_hops
:
5245 is_ipv4_default_route_found
= True
5247 "{} default route with next hop {} is found in FIB ".format(
5248 ipv4_default_route
, expected_nexthop
5253 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5254 ipv4_default_route
, expected_nexthop
5259 if routes
["ipv6"] in ipv6_routes
.keys() or "::/0" in ipv6_routes
.keys():
5260 rib_ipv6_nxt_hops
= []
5261 if "::/0" in ipv6_routes
.keys():
5262 ipv6_default_route
= "::/0"
5263 elif routes
["ipv6"] in ipv6_routes
.keys():
5264 ipv6_default_route
= routes
["ipv6"]
5266 nxt_hop_count
= len(ipv6_routes
[ipv6_default_route
][0]["nexthops"])
5267 for index
in range(nxt_hop_count
):
5268 rib_ipv6_nxt_hops
.append(
5269 ipv6_routes
[ipv6_default_route
][0]["nexthops"][index
]["ip"]
5272 if expected_nexthop
["ipv6"] in rib_ipv6_nxt_hops
:
5273 is_ipv6_default_route_found
= True
5275 "{} default route with next hop {} is found in FIB ".format(
5276 ipv6_default_route
, expected_nexthop
5281 "ERROR .. ! {} default route with next hop {} is not found in FIB ".format(
5282 ipv6_default_route
, expected_nexthop
5287 if is_ipv4_default_route_found
and is_ipv6_default_route_found
:
5291 "Default Route for ipv4 and ipv6 address family is not found in FIB "
5296 @retry(retry_timeout
=5)
5297 def verify_bgp_advertised_routes_from_neighbor(tgen
, topo
, dut
, peer
, expected_routes
):
5299 APi is verifies the the routes that are advertised from dut to peer
5302 "sh ip bgp neighbor <x.x.x.x> advertised-routes" and
5303 "sh ip bgp ipv6 unicast neighbor<x::x> advertised-routes"
5305 dut : Device Under Tests
5306 Peer : Peer on which the routs is expected
5307 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5310 returns: True / False
5314 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5315 tgen
= get_topogen()
5317 peer_ipv4_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv4"].split("/")[0]
5318 peer_ipv6_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv6"].split("/")[0]
5320 for router
, rnode
in tgen
.routers().items():
5322 ipv4_receieved_routes
= run_frr_cmd(
5324 "sh ip bgp neighbor {} advertised-routes json".format(
5325 peer_ipv4_neighbor_ip
5329 ipv6_receieved_routes
= run_frr_cmd(
5331 "sh ip bgp ipv6 unicast neighbor {} advertised-routes json".format(
5332 peer_ipv6_neighbor_ip
5336 ipv4_route_count
= 0
5337 ipv6_route_count
= 0
5338 if ipv4_receieved_routes
:
5339 for index
in range(len(expected_routes
["ipv4"])):
5341 expected_routes
["ipv4"][index
]["network"]
5342 in ipv4_receieved_routes
["advertisedRoutes"].keys()
5344 ipv4_route_count
+= 1
5346 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5347 dut
, expected_routes
["ipv4"][index
]["network"], peer
5352 expected_routes
["ipv4"][index
]["network"]
5353 in ipv4_receieved_routes
["bgpOriginatingDefaultNetwork"]
5355 ipv4_route_count
+= 1
5357 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5358 dut
, expected_routes
["ipv4"][index
]["network"], peer
5364 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5365 dut
, expected_routes
["ipv4"][index
]["network"], peer
5369 logger
.error(ipv4_receieved_routes
)
5371 "ERROR...! [DUT : {}] No IPV4 Routes are advertised to the peer {}".format(
5377 if ipv6_receieved_routes
:
5378 for index
in range(len(expected_routes
["ipv6"])):
5380 expected_routes
["ipv6"][index
]["network"]
5381 in ipv6_receieved_routes
["advertisedRoutes"].keys()
5383 ipv6_route_count
+= 1
5385 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5386 dut
, expected_routes
["ipv6"][index
]["network"], peer
5390 expected_routes
["ipv6"][index
]["network"]
5391 in ipv6_receieved_routes
["bgpOriginatingDefaultNetwork"]
5393 ipv6_route_count
+= 1
5395 "Success [DUT : {}] The Expected Route {} is advertised to {} ".format(
5396 dut
, expected_routes
["ipv6"][index
]["network"], peer
5401 "ERROR....![DUT : {}] The Expected Route {} is not advertised to {} ".format(
5402 dut
, expected_routes
["ipv6"][index
]["network"], peer
5406 logger
.error(ipv6_receieved_routes
)
5408 "ERROR...! [DUT : {}] No IPV6 Routes are advertised to the peer {}".format(
5414 if ipv4_route_count
== len(expected_routes
["ipv4"]) and ipv6_route_count
== len(
5415 expected_routes
["ipv6"]
5420 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5421 expected_routes
["ipv4"], ipv4_receieved_routes
["advertisedRoutes"]
5425 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5426 expected_routes
["ipv6"], ipv6_receieved_routes
["advertisedRoutes"]
5432 @retry(retry_timeout
=5)
5433 def verify_bgp_received_routes_from_neighbor(tgen
, topo
, dut
, peer
, expected_routes
):
5435 API to verify the bgp received routes
5439 show ip bgp neighbor <x.x.x.x> received-routes
5440 show ip bgp ipv6 unicast neighbor <x::x> received-routes
5444 dut : Device Under Tests
5445 Peer : Peer on which the routs is expected
5446 expected_routes : dual stack IPV4-and IPv6 routes to be verified
5454 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
5455 tgen
= get_topogen()
5457 peer_ipv4_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv4"].split("/")[0]
5458 peer_ipv6_neighbor_ip
= topo
["routers"][peer
]["links"][dut
]["ipv6"].split("/")[0]
5460 logger
.info("Enabling Soft configuration to neighbor INBOUND ")
5461 neigbor_dict
= {"ipv4": peer_ipv4_neighbor_ip
, "ipv6": peer_ipv6_neighbor_ip
}
5462 result
= configure_bgp_soft_configuration(
5463 tgen
, dut
, neigbor_dict
, direction
="inbound"
5467 ), " Failed to configure the soft configuration \n Error: {}".format(result
)
5469 """sleep of 10 sec is required to get the routes on peer after soft configuration"""
5471 for router
, rnode
in tgen
.routers().items():
5473 ipv4_receieved_routes
= run_frr_cmd(
5475 "sh ip bgp neighbor {} received-routes json".format(
5476 peer_ipv4_neighbor_ip
5480 ipv6_receieved_routes
= run_frr_cmd(
5482 "sh ip bgp ipv6 unicast neighbor {} received-routes json".format(
5483 peer_ipv6_neighbor_ip
5487 ipv4_route_count
= 0
5488 ipv6_route_count
= 0
5489 if ipv4_receieved_routes
:
5490 for index
in range(len(expected_routes
["ipv4"])):
5492 expected_routes
["ipv4"][index
]["network"]
5493 in ipv4_receieved_routes
["receivedRoutes"].keys()
5495 ipv4_route_count
+= 1
5497 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5498 dut
, expected_routes
["ipv4"][index
]["network"], peer
5503 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5504 dut
, expected_routes
["ipv4"][index
]["network"], peer
5508 logger
.error(ipv4_receieved_routes
)
5510 "ERROR...! [DUT : {}] No IPV4 Routes are received from the peer {}".format(
5516 if ipv6_receieved_routes
:
5517 for index
in range(len(expected_routes
["ipv6"])):
5519 expected_routes
["ipv6"][index
]["network"]
5520 in ipv6_receieved_routes
["receivedRoutes"].keys()
5522 ipv6_route_count
+= 1
5524 "Success [DUT : {}] The Expected Route {} is received from {} ".format(
5525 dut
, expected_routes
["ipv6"][index
]["network"], peer
5530 "ERROR....![DUT : {}] The Expected Route {} is not received from {} ".format(
5531 dut
, expected_routes
["ipv6"][index
]["network"], peer
5535 logger
.error(ipv6_receieved_routes
)
5537 "ERROR...! [DUT : {}] No IPV6 Routes are received from the peer {}".format(
5543 if ipv4_route_count
== len(expected_routes
["ipv4"]) and ipv6_route_count
== len(
5544 expected_routes
["ipv6"]
5549 "ERROR ....! IPV4 : Expected Routes -> {} obtained ->{} ".format(
5550 expected_routes
["ipv4"], ipv4_receieved_routes
["advertisedRoutes"]
5554 "ERROR ....! IPV6 : Expected Routes -> {} obtained ->{} ".format(
5555 expected_routes
["ipv6"], ipv6_receieved_routes
["advertisedRoutes"]
5561 def configure_bgp_soft_configuration(tgen
, dut
, neighbor_dict
, direction
):
5563 Api to configure the bgp soft configuration to show the received routes from peer
5566 dut : device under test route on which the sonfiguration to be applied
5567 neighbor_dict : dict element contains ipv4 and ipv6 neigbor ip
5568 direction : Directionon which it should be applied in/out
5574 logger
.info("Enabling Soft configuration to neighbor INBOUND ")
5575 local_as
= get_dut_as_number(tgen
, dut
)
5576 ipv4_neighbor
= neighbor_dict
["ipv4"]
5577 ipv6_neighbor
= neighbor_dict
["ipv6"]
5578 direction
= direction
.lower()
5579 if ipv4_neighbor
and ipv4_neighbor
:
5583 "router bgp {}".format(local_as
),
5584 "address-family ipv4 unicast",
5585 "neighbor {} soft-reconfiguration {} ".format(
5586 ipv4_neighbor
, direction
5588 "exit-address-family",
5589 "address-family ipv6 unicast",
5590 "neighbor {} soft-reconfiguration {} ".format(
5591 ipv6_neighbor
, direction
5593 "exit-address-family",
5597 result
= apply_raw_config(tgen
, raw_config
)
5599 "Success... [DUT : {}] The soft configuration onis applied on neighbors {} ".format(