2 # Copyright (c) 2019 by VMware, Inc. ("VMware")
3 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
4 # ("NetDEF") in this file.
6 # Permission to use, copy, modify, and/or distribute this software
7 # for any purpose with or without fee is hereby granted, provided
8 # that the above copyright notice and this permission notice appear
11 # THE SOFTWARE IS PROVIDED "AS IS" AND VMWARE DISCLAIMS ALL WARRANTIES
12 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL VMWARE BE LIABLE FOR
14 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
15 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
16 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
17 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
21 from copy
import deepcopy
22 from time
import sleep
27 from lib
import topotest
28 from lib
.topolog
import logger
30 # Import common_config to use commomnly used APIs
31 from lib
.common_config
import (
32 create_common_configuration
,
34 load_config_to_router
,
38 find_interface_with_greater_ip
,
44 LOGDIR
= "/tmp/topotests/"
48 def create_router_bgp(tgen
, topo
, input_dict
=None, build
=False, load_config
=True):
50 API to configure bgp on router
54 * `tgen` : Topogen object
55 * `topo` : json file data
56 * `input_dict` : Input dict data, required when configuring from testcase
57 * `build` : Only for initial setup phase this is set as True.
65 "router_id": "22.22.22.22",
67 "graceful-restart": True,
68 "preserve-fw-state": True,
72 "select-defer-time": 30,
79 "redist_type": "static",
84 {"redist_type": "connected"}
86 "advertise_networks": [
88 "network": "20.0.0.0/32",
92 "network": "30.0.0.0/32",
103 "number_occurences": 2
112 "name": "RMAP_MED_R3",
115 "next_hop_self": True
117 "r1": {"graceful-restart-helper": True}
133 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
136 # Flag is used when testing ipv6 over ipv4 or vice-versa
140 input_dict
= deepcopy(topo
)
142 topo
= topo
["routers"]
143 input_dict
= deepcopy(input_dict
)
145 for router
in input_dict
.keys():
146 if "bgp" not in input_dict
[router
]:
147 logger
.debug("Router %s: 'bgp' not present in input_dict", router
)
150 bgp_data_list
= input_dict
[router
]["bgp"]
152 if type(bgp_data_list
) is not list:
153 bgp_data_list
= [bgp_data_list
]
155 for bgp_data
in bgp_data_list
:
156 data_all_bgp
= __create_bgp_global(tgen
, bgp_data
, router
, build
)
158 bgp_addr_data
= bgp_data
.setdefault("address_family", {})
160 if not bgp_addr_data
:
162 "Router %s: 'address_family' not present in "
163 "input_dict for BGP",
168 ipv4_data
= bgp_addr_data
.setdefault("ipv4", {})
169 ipv6_data
= bgp_addr_data
.setdefault("ipv6", {})
173 if ipv4_data
.setdefault("unicast", {})
174 or ipv6_data
.setdefault("unicast", {})
179 data_all_bgp
= __create_bgp_unicast_neighbor(
185 config_data
=data_all_bgp
,
189 result
= create_common_configuration(
190 tgen
, router
, data_all_bgp
, "bgp", build
, load_config
192 except InvalidCLIError
:
194 errormsg
= traceback
.format_exc()
195 logger
.error(errormsg
)
198 logger
.debug("Exiting lib API: create_router_bgp()")
202 def __create_bgp_global(tgen
, input_dict
, router
, build
=False):
204 Helper API to create bgp global configuration.
208 * `tgen` : Topogen object
209 * `input_dict` : Input dict data, required when configuring from testcase
210 * `router` : router id to be configured.
211 * `build` : Only for initial setup phase this is set as True.
219 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
221 bgp_data
= input_dict
222 del_bgp_action
= bgp_data
.setdefault("delete", False)
226 if "local_as" not in bgp_data
and build
:
228 "Router %s: 'local_as' not present in input_dict" "for BGP", router
232 local_as
= bgp_data
.setdefault("local_as", "")
233 cmd
= "router bgp {}".format(local_as
)
234 vrf_id
= bgp_data
.setdefault("vrf", None)
236 cmd
= "{} vrf {}".format(cmd
, vrf_id
)
239 cmd
= "no {}".format(cmd
)
240 config_data
.append(cmd
)
244 config_data
.append(cmd
)
245 config_data
.append("no bgp ebgp-requires-policy")
247 router_id
= bgp_data
.setdefault("router_id", None)
248 del_router_id
= bgp_data
.setdefault("del_router_id", False)
250 config_data
.append("no bgp router-id")
252 config_data
.append("bgp router-id {}".format(router_id
))
254 config_data
.append("no bgp network import-check")
256 if "graceful-restart" in bgp_data
:
257 graceful_config
= bgp_data
["graceful-restart"]
259 graceful_restart
= graceful_config
.setdefault("graceful-restart", None)
261 graceful_restart_disable
= graceful_config
.setdefault(
262 "graceful-restart-disable", None
265 preserve_fw_state
= graceful_config
.setdefault("preserve-fw-state", None)
267 disable_eor
= graceful_config
.setdefault("disable-eor", None)
269 if graceful_restart
== False:
270 cmd
= "no bgp graceful-restart"
272 cmd
= "bgp graceful-restart"
274 if graceful_restart
is not None:
275 config_data
.append(cmd
)
277 if graceful_restart_disable
== False:
278 cmd
= "no bgp graceful-restart-disable"
279 if graceful_restart_disable
:
280 cmd
= "bgp graceful-restart-disable"
282 if graceful_restart_disable
is not None:
283 config_data
.append(cmd
)
285 if preserve_fw_state
== False:
286 cmd
= "no bgp graceful-restart preserve-fw-state"
287 if preserve_fw_state
:
288 cmd
= "bgp graceful-restart preserve-fw-state"
290 if preserve_fw_state
is not None:
291 config_data
.append(cmd
)
293 if disable_eor
== False:
294 cmd
= "no bgp graceful-restart disable-eor"
296 cmd
= "bgp graceful-restart disable-eor"
298 if disable_eor
is not None:
299 config_data
.append(cmd
)
301 if "timer" in bgp_data
["graceful-restart"]:
302 timer
= bgp_data
["graceful-restart"]["timer"]
304 if "delete" in timer
:
305 del_action
= timer
["delete"]
309 for rs_timer
, value
in timer
.items():
310 rs_timer_value
= timer
.setdefault(rs_timer
, None)
312 if rs_timer_value
and rs_timer
!= "delete":
313 cmd
= "bgp graceful-restart {} {}".format(rs_timer
, rs_timer_value
)
316 cmd
= "no {}".format(cmd
)
318 config_data
.append(cmd
)
320 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
324 def __create_bgp_unicast_neighbor(
325 tgen
, topo
, input_dict
, router
, afi_test
, config_data
=None
328 Helper API to create configuration for address-family unicast
332 * `tgen` : Topogen object
333 * `topo` : json file data
334 * `input_dict` : Input dict data, required when configuring from testcase
335 * `router` : router id to be configured.
336 * `afi_test` : use when ipv6 needs to be tested over ipv4 or vice-versa
337 * `build` : Only for initial setup phase this is set as True.
341 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
344 bgp_data
= input_dict
345 if "router bgp" in config_data
:
348 bgp_data
= input_dict
["address_family"]
350 for addr_type
, addr_dict
in bgp_data
.iteritems():
354 if not check_address_types(addr_type
) and not afi_test
:
357 addr_data
= addr_dict
["unicast"]
359 config_data
.append("address-family {} unicast".format(addr_type
))
360 advertise_network
= addr_data
.setdefault("advertise_networks", [])
361 for advertise_network_dict
in advertise_network
:
362 network
= advertise_network_dict
["network"]
363 if type(network
) is not list:
366 if "no_of_network" in advertise_network_dict
:
367 no_of_network
= advertise_network_dict
["no_of_network"]
371 del_action
= advertise_network_dict
.setdefault("delete", False)
373 # Generating IPs for verification
374 prefix
= str(ipaddr
.IPNetwork(unicode(network
[0])).prefixlen
)
375 network_list
= generate_ips(network
, no_of_network
)
376 for ip
in network_list
:
377 ip
= str(ipaddr
.IPNetwork(unicode(ip
)).network
)
379 cmd
= "network {}/{}".format(ip
, prefix
)
381 cmd
= "no {}".format(cmd
)
383 config_data
.append(cmd
)
385 max_paths
= addr_data
.setdefault("maximum_paths", {})
387 ibgp
= max_paths
.setdefault("ibgp", None)
388 ebgp
= max_paths
.setdefault("ebgp", None)
390 config_data
.append("maximum-paths ibgp {}".format(ibgp
))
392 config_data
.append("maximum-paths {}".format(ebgp
))
394 aggregate_addresses
= addr_data
.setdefault("aggregate_address", [])
395 for aggregate_address
in aggregate_addresses
:
396 network
= aggregate_address
.setdefault("network", None)
399 "Router %s: 'network' not present in " "input_dict for BGP", router
402 cmd
= "aggregate-address {}".format(network
)
404 as_set
= aggregate_address
.setdefault("as_set", False)
405 summary
= aggregate_address
.setdefault("summary", False)
406 del_action
= aggregate_address
.setdefault("delete", False)
408 cmd
= "{} as-set".format(cmd
)
410 cmd
= "{} summary".format(cmd
)
413 cmd
= "no {}".format(cmd
)
415 config_data
.append(cmd
)
417 redistribute_data
= addr_data
.setdefault("redistribute", {})
418 if redistribute_data
:
419 for redistribute
in redistribute_data
:
420 if "redist_type" not in redistribute
:
422 "Router %s: 'redist_type' not present in " "input_dict", router
425 cmd
= "redistribute {}".format(redistribute
["redist_type"])
426 redist_attr
= redistribute
.setdefault("attribute", None)
428 if isinstance(redist_attr
, dict):
429 for key
, value
in redist_attr
.items():
430 cmd
= "{} {} {}".format(cmd
, key
, value
)
432 cmd
= "{} {}".format(cmd
, redist_attr
)
434 del_action
= redistribute
.setdefault("delete", False)
436 cmd
= "no {}".format(cmd
)
437 config_data
.append(cmd
)
439 if "neighbor" in addr_data
:
440 neigh_data
= __create_bgp_neighbor(
441 topo
, input_dict
, router
, addr_type
, add_neigh
443 config_data
.extend(neigh_data
)
445 for addr_type
, addr_dict
in bgp_data
.iteritems():
446 if not addr_dict
or not check_address_types(addr_type
):
449 addr_data
= addr_dict
["unicast"]
450 if "neighbor" in addr_data
:
451 neigh_addr_data
= __create_bgp_unicast_address_family(
452 topo
, input_dict
, router
, addr_type
, add_neigh
455 config_data
.extend(neigh_addr_data
)
457 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
461 def __create_bgp_neighbor(topo
, input_dict
, router
, addr_type
, add_neigh
=True):
463 Helper API to create neighbor specific configuration
467 * `tgen` : Topogen object
468 * `topo` : json file data
469 * `input_dict` : Input dict data, required when configuring from testcase
470 * `router` : router id to be configured
474 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
476 bgp_data
= input_dict
["address_family"]
477 neigh_data
= bgp_data
[addr_type
]["unicast"]["neighbor"]
479 for name
, peer_dict
in neigh_data
.iteritems():
480 for dest_link
, peer
in peer_dict
["dest_link"].iteritems():
481 nh_details
= topo
[name
]
483 if "vrfs" in topo
[router
]:
484 remote_as
= nh_details
["bgp"][0]["local_as"]
486 remote_as
= nh_details
["bgp"]["local_as"]
490 if dest_link
in nh_details
["links"].keys():
491 ip_addr
= nh_details
["links"][dest_link
][addr_type
].split("/")[0]
493 if "source_link" in peer
and peer
["source_link"] == "lo":
494 update_source
= topo
[router
]["links"]["lo"][addr_type
].split("/")[0]
496 neigh_cxt
= "neighbor {}".format(ip_addr
)
499 config_data
.append("{} remote-as {}".format(neigh_cxt
, remote_as
))
500 if addr_type
== "ipv6":
501 config_data
.append("address-family ipv6 unicast")
502 config_data
.append("{} activate".format(neigh_cxt
))
504 disable_connected
= peer
.setdefault("disable_connected_check", False)
505 keep_alive
= peer
.setdefault("keepalivetimer", 60)
506 hold_down
= peer
.setdefault("holddowntimer", 180)
507 password
= peer
.setdefault("password", None)
508 max_hop_limit
= peer
.setdefault("ebgp_multihop", 1)
509 graceful_restart
= peer
.setdefault("graceful-restart", None)
510 graceful_restart_helper
= peer
.setdefault("graceful-restart-helper", None)
511 graceful_restart_disable
= peer
.setdefault("graceful-restart-disable", None)
515 "{} update-source {}".format(neigh_cxt
, update_source
)
517 if disable_connected
:
519 "{} disable-connected-check".format(disable_connected
)
523 "{} update-source {}".format(neigh_cxt
, update_source
)
525 if int(keep_alive
) != 60 and int(hold_down
) != 180:
527 "{} timers {} {}".format(neigh_cxt
, keep_alive
, hold_down
)
531 config_data
.append("{} graceful-restart".format(neigh_cxt
))
532 elif graceful_restart
== False:
533 config_data
.append("no {} graceful-restart".format(neigh_cxt
))
535 if graceful_restart_helper
:
536 config_data
.append("{} graceful-restart-helper".format(neigh_cxt
))
537 elif graceful_restart_helper
== False:
538 config_data
.append("no {} graceful-restart-helper".format(neigh_cxt
))
540 if graceful_restart_disable
:
541 config_data
.append("{} graceful-restart-disable".format(neigh_cxt
))
542 elif graceful_restart_disable
== False:
543 config_data
.append("no {} graceful-restart-disable".format(neigh_cxt
))
546 config_data
.append("{} password {}".format(neigh_cxt
, password
))
548 if max_hop_limit
> 1:
550 "{} ebgp-multihop {}".format(neigh_cxt
, max_hop_limit
)
552 config_data
.append("{} enforce-multihop".format(neigh_cxt
))
554 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
558 def __create_bgp_unicast_address_family(
559 topo
, input_dict
, router
, addr_type
, add_neigh
=True
562 API prints bgp global config to bgp_json file.
566 * `bgp_cfg` : BGP class variables have BGP config saved in it for
568 * `local_as_no` : Local as number
569 * `router_id` : Router-id
570 * `ecmp_path` : ECMP max path
571 * `gr_enable` : BGP global gracefull restart config
575 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
577 bgp_data
= input_dict
["address_family"]
578 neigh_data
= bgp_data
[addr_type
]["unicast"]["neighbor"]
580 for peer_name
, peer_dict
in deepcopy(neigh_data
).iteritems():
581 for dest_link
, peer
in peer_dict
["dest_link"].iteritems():
584 nh_details
= topo
[peer_name
]
585 activate_addr_family
= peer
.setdefault("activate", None)
586 deactivate_addr_family
= peer
.setdefault("deactivate", None)
588 if "source_link" in peer
and peer
["source_link"] == "lo":
589 for destRouterLink
, data
in sorted(nh_details
["links"].iteritems()):
590 if "type" in data
and data
["type"] == "loopback":
591 if dest_link
== destRouterLink
:
592 ip_addr
= nh_details
["links"][destRouterLink
][
598 if dest_link
in nh_details
["links"].keys():
600 ip_addr
= nh_details
["links"][dest_link
][addr_type
].split("/")[0]
601 if addr_type
== "ipv4" and bgp_data
["ipv6"]:
602 deactivate
= nh_details
["links"][dest_link
]["ipv6"].split("/")[
606 neigh_cxt
= "neighbor {}".format(ip_addr
)
607 config_data
.append("address-family {} unicast".format(addr_type
))
609 if activate_addr_family
is not None:
611 "address-family {} unicast".format(activate_addr_family
)
614 config_data
.append("{} activate".format(neigh_cxt
))
616 if deactivate
and activate_addr_family
is None:
617 config_data
.append("no neighbor {} activate".format(deactivate
))
619 if deactivate_addr_family
is not None:
621 "address-family {} unicast".format(deactivate_addr_family
)
623 config_data
.append("no {} activate".format(neigh_cxt
))
625 next_hop_self
= peer
.setdefault("next_hop_self", None)
626 send_community
= peer
.setdefault("send_community", None)
627 prefix_lists
= peer
.setdefault("prefix_lists", {})
628 route_maps
= peer
.setdefault("route_maps", {})
629 no_send_community
= peer
.setdefault("no_send_community", None)
630 allowas_in
= peer
.setdefault("allowas-in", None)
633 if next_hop_self
is not None:
634 if next_hop_self
is True:
635 config_data
.append("{} next-hop-self".format(neigh_cxt
))
637 config_data
.append("no {} next-hop-self".format(neigh_cxt
))
641 config_data
.append("{} send-community".format(neigh_cxt
))
644 if no_send_community
:
646 "no {} send-community {}".format(neigh_cxt
, no_send_community
)
649 if "allowas_in" in peer
:
650 allow_as_in
= peer
["allowas_in"]
651 config_data
.append("{} allowas-in {}".format(neigh_cxt
, allow_as_in
))
653 if "no_allowas_in" in peer
:
654 allow_as_in
= peer
["no_allowas_in"]
655 config_data
.append("no {} allowas-in {}".format(neigh_cxt
, allow_as_in
))
657 for prefix_list
in prefix_lists
:
658 name
= prefix_list
.setdefault("name", {})
659 direction
= prefix_list
.setdefault("direction", "in")
660 del_action
= prefix_list
.setdefault("delete", False)
663 "Router %s: 'name' not present in "
664 "input_dict for BGP neighbor prefix lists",
668 cmd
= "{} prefix-list {} {}".format(neigh_cxt
, name
, direction
)
670 cmd
= "no {}".format(cmd
)
671 config_data
.append(cmd
)
674 for route_map
in route_maps
:
675 name
= route_map
.setdefault("name", {})
676 direction
= route_map
.setdefault("direction", "in")
677 del_action
= route_map
.setdefault("delete", False)
680 "Router %s: 'name' not present in "
681 "input_dict for BGP neighbor route name",
685 cmd
= "{} route-map {} {}".format(neigh_cxt
, name
, direction
)
687 cmd
= "no {}".format(cmd
)
688 config_data
.append(cmd
)
691 number_occurences
= allowas_in
.setdefault("number_occurences", {})
692 del_action
= allowas_in
.setdefault("delete", False)
694 cmd
= "{} allowas-in {}".format(neigh_cxt
, number_occurences
)
697 cmd
= "no {}".format(cmd
)
699 config_data
.append(cmd
)
704 def modify_bgp_config_when_bgpd_down(tgen
, topo
, input_dict
):
706 API will save the current config to router's /etc/frr/ for BGPd
707 daemon(bgpd.conf file)
711 * `tgen` : Topogen object
712 * `topo` : json file data
713 * `input_dict` : defines for which router, and which config
718 # Modify graceful-restart config not to set f-bit
719 # and write to /etc/frr
721 # Api call to delete advertised networks
728 "advertise_networks": [
730 "network": "101.0.20.1/32",
739 "advertise_networks": [
741 "network": "5::1/128",
753 result = modify_bgp_config_when_bgpd_down(tgen, topo, input_dict)
757 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
762 result
= create_router_bgp(
763 tgen
, topo
, input_dict
, build
=False, load_config
=False
765 if result
is not True:
768 # Copy bgp config file to /etc/frr
769 for dut
in input_dict
.keys():
770 router_list
= tgen
.routers()
771 for router
, rnode
in router_list
.iteritems():
775 TMPDIR
= os
.path
.join(LOGDIR
, tgen
.modname
)
777 logger
.info("Delete BGP config when BGPd is down in {}".format(router
))
778 # Reading the config from /tmp/topotests and
779 # copy to /etc/frr/bgpd.conf
780 cmd
= "cat {}/{}/{} >> /etc/frr/bgpd.conf".format(
781 TMPDIR
, router
, FRRCFG_FILE
783 router_list
[router
].run(cmd
)
785 except Exception as e
:
786 # handle any exception
787 logger
.error("Error %s occured. Arguments %s.", e
.message
, e
.args
)
790 errormsg
= traceback
.format_exc()
791 logger
.error(errormsg
)
794 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
798 #############################################
800 #############################################
801 @retry(attempts
=3, wait
=2, return_is_str
=True)
802 def verify_router_id(tgen
, topo
, input_dict
):
804 Running command "show ip bgp json" for DUT and reading router-id
805 from input_dict and verifying with command output.
806 1. Statically modfified router-id should take place
807 2. When static router-id is deleted highest loopback should
809 3. When loopback intf is down then highest physcial intf
810 should become router-id
814 * `tgen`: topogen object
815 * `topo`: input json file data
816 * `input_dict`: input dictionary, have details of Device Under Test, for
817 which user wants to test the data
820 # Verify if router-id for r1 is 12.12.12.12
823 "router_id": "12.12.12.12"
825 # Verify that router-id for r1 is highest interface ip
829 result = verify_router_id(tgen, topo, input_dict)
833 errormsg(str) or True
836 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
837 for router
in input_dict
.keys():
838 if router
not in tgen
.routers():
841 rnode
= tgen
.routers()[router
]
843 del_router_id
= input_dict
[router
]["bgp"].setdefault("del_router_id", False)
845 logger
.info("Checking router %s router-id", router
)
846 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
847 router_id_out
= show_bgp_json
["ipv4Unicast"]["routerId"]
848 router_id_out
= ipaddr
.IPv4Address(unicode(router_id_out
))
850 # Once router-id is deleted, highest interface ip should become
853 router_id
= find_interface_with_greater_ip(topo
, router
)
855 router_id
= input_dict
[router
]["bgp"]["router_id"]
856 router_id
= ipaddr
.IPv4Address(unicode(router_id
))
858 if router_id
== router_id_out
:
859 logger
.info("Found expected router-id %s for router %s", router_id
, router
)
862 "Router-id for router:{} mismatch, expected:"
863 " {} but found:{}".format(router
, router_id
, router_id_out
)
867 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
871 @retry(attempts
=44, wait
=3, return_is_str
=True)
872 def verify_bgp_convergence(tgen
, topo
, dut
=None):
874 API will verify if BGP is converged with in the given time frame.
875 Running "show bgp summary json" command and verify bgp neighbor
876 state is established,
880 * `tgen`: topogen object
881 * `topo`: input json file data
882 * `dut`: device under test
886 # To veriry is BGP is converged for all the routers used in
888 results = verify_bgp_convergence(tgen, topo, dut="r1")
892 errormsg(str) or True
895 logger
.debug("Entering lib API: verify_bgp_convergence()")
896 for router
, rnode
in tgen
.routers().iteritems():
897 if "bgp" not in topo
["routers"][router
]:
900 if dut
is not None and dut
!= router
:
903 logger
.info("Verifying BGP Convergence on router %s:", router
)
904 show_bgp_json
= run_frr_cmd(rnode
, "show bgp vrf all summary json", isjson
=True)
905 # Verifying output dictionary show_bgp_json is empty or not
906 if not bool(show_bgp_json
):
907 errormsg
= "BGP is not running"
910 # To find neighbor ip type
911 bgp_data_list
= topo
["routers"][router
]["bgp"]
913 if type(bgp_data_list
) is not list:
914 bgp_data_list
= [bgp_data_list
]
916 for bgp_data
in bgp_data_list
:
917 if "vrf" in bgp_data
:
918 vrf
= bgp_data
["vrf"]
924 # To find neighbor ip type
925 bgp_addr_type
= bgp_data
["address_family"]
926 if "l2vpn" in bgp_addr_type
:
929 if "neighbor" not in bgp_addr_type
["l2vpn"]["evpn"]:
932 bgp_neighbors
= bgp_addr_type
["l2vpn"]["evpn"]["neighbor"]
933 total_evpn_peer
+= len(bgp_neighbors
)
936 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
937 for _addr_type
, dest_link_dict
in peer_data
.items():
938 data
= topo
["routers"][bgp_neighbor
]["links"]
939 for dest_link
in dest_link_dict
.keys():
940 if dest_link
in data
:
941 peer_details
= peer_data
[_addr_type
][dest_link
]
943 neighbor_ip
= data
[dest_link
][_addr_type
].split("/")[0]
947 "ipv4Unicast" in show_bgp_json
[vrf
]
948 or "ipv6Unicast" in show_bgp_json
[vrf
]
951 "[DUT: %s] VRF: %s, "
952 "ipv4Unicast/ipv6Unicast"
953 " address-family present"
954 " under l2vpn" % (router
, vrf
)
958 l2VpnEvpn_data
= show_bgp_json
[vrf
]["l2VpnEvpn"][
961 nh_state
= l2VpnEvpn_data
[neighbor_ip
]["state"]
963 if nh_state
== "Established":
966 if no_of_evpn_peer
== total_evpn_peer
:
968 "[DUT: %s] VRF: %s, BGP is Converged for " "epvn peers",
974 "[DUT: %s] VRF: %s, BGP is not converged "
975 "for evpn peers" % (router
, vrf
)
979 for addr_type
in bgp_addr_type
.keys():
980 if not check_address_types(addr_type
):
984 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
986 for bgp_neighbor
in bgp_neighbors
:
987 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
989 for addr_type
in bgp_addr_type
.keys():
990 if not check_address_types(addr_type
):
992 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
995 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
996 for dest_link
in peer_data
["dest_link"].keys():
997 data
= topo
["routers"][bgp_neighbor
]["links"]
998 if dest_link
in data
:
999 peer_details
= peer_data
["dest_link"][dest_link
]
1000 # for link local neighbors
1002 "neighbor_type" in peer_details
1003 and peer_details
["neighbor_type"] == "link-local"
1005 neighbor_ip
= get_ipv6_linklocal_address(
1006 topo
["routers"], bgp_neighbor
, dest_link
1008 elif "source_link" in peer_details
:
1009 neighbor_ip
= topo
["routers"][bgp_neighbor
][
1011 ][peer_details
["source_link"]][addr_type
].split(
1017 "neighbor_type" in peer_details
1018 and peer_details
["neighbor_type"] == "unnumbered"
1020 neighbor_ip
= data
[dest_link
]["peer-interface"]
1022 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[
1027 if addr_type
== "ipv4":
1028 ipv4_data
= show_bgp_json
[vrf
]["ipv4Unicast"][
1031 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1033 ipv6_data
= show_bgp_json
[vrf
]["ipv6Unicast"][
1036 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1038 if nh_state
== "Established":
1041 if no_of_peer
== total_peer
:
1042 logger
.info("[DUT: %s] VRF: %s, BGP is Converged", router
, vrf
)
1044 errormsg
= "[DUT: %s] VRF: %s, BGP is not converged" % (router
, vrf
)
1047 logger
.debug("Exiting API: verify_bgp_convergence()")
1051 @retry(attempts
=3, wait
=4, return_is_str
=True)
1052 def verify_bgp_community(
1053 tgen
, addr_type
, router
, network
, input_dict
=None, vrf
=None, bestpath
=False
1056 API to veiryf BGP large community is attached in route for any given
1057 DUT by running "show bgp ipv4/6 {route address} json" command.
1061 * `tgen`: topogen object
1062 * `addr_type` : ip type, ipv4/ipv6
1063 * `dut`: Device Under Test
1064 * `network`: network for which set criteria needs to be verified
1065 * `input_dict`: having details like - for which router, community and
1066 values needs to be verified
1068 * `bestpath`: To check best path cli
1072 networks = ["200.50.2.0/32"]
1074 "largeCommunity": "2:1:1 2:2:2 2:3:3 2:4:4 2:5:5"
1076 result = verify_bgp_community(tgen, "ipv4", dut, network, input_dict=None)
1080 errormsg(str) or True
1083 logger
.debug("Entering lib API: verify_bgp_community()")
1084 if router
not in tgen
.routers():
1087 rnode
= tgen
.routers()[router
]
1090 "Verifying BGP community attributes on dut %s: for %s " "network %s",
1096 command
= "show bgp"
1101 cmd
= "{} vrf {} {} {} json".format(command
, vrf
, addr_type
, net
)
1103 cmd
= "{} {} {} bestpath json".format(command
, addr_type
, net
)
1105 cmd
= "{} {} {} json".format(command
, addr_type
, net
)
1107 show_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
1108 if "paths" not in show_bgp_json
:
1109 return "Prefix {} not found in BGP table of router: {}".format(net
, router
)
1111 as_paths
= show_bgp_json
["paths"]
1113 for i
in range(len(as_paths
)):
1115 "largeCommunity" in show_bgp_json
["paths"][i
]
1116 or "community" in show_bgp_json
["paths"][i
]
1120 "Large Community attribute is found for route:" " %s in router: %s",
1124 if input_dict
is not None:
1125 for criteria
, comm_val
in input_dict
.items():
1126 show_val
= show_bgp_json
["paths"][i
][criteria
]["string"]
1127 if comm_val
== show_val
:
1129 "Verifying BGP %s for prefix: %s"
1130 " in router: %s, found expected"
1139 "Failed: Verifying BGP attribute"
1140 " {} for route: {} in router: {}"
1141 ", expected value: {} but found"
1142 ": {}".format(criteria
, net
, router
, comm_val
, show_val
)
1148 "Large Community attribute is not found for route: "
1149 "{} in router: {} ".format(net
, router
)
1153 logger
.debug("Exiting lib API: verify_bgp_community()")
1157 def modify_as_number(tgen
, topo
, input_dict
):
1159 API reads local_as and remote_as from user defined input_dict and
1160 modify router"s ASNs accordingly. Router"s config is modified and
1161 recent/changed config is loadeded to router.
1165 * `tgen` : Topogen object
1166 * `topo` : json file data
1167 * `input_dict` : defines for which router ASNs needs to be modified
1171 To modify ASNs for router r1
1178 result = modify_as_number(tgen, topo, input_dict)
1182 errormsg(str) or True
1185 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1188 new_topo
= deepcopy(topo
["routers"])
1190 for router
in input_dict
.keys():
1191 # Remove bgp configuration
1193 router_dict
.update({router
: {"bgp": {"delete": True}}})
1195 new_topo
[router
]["bgp"]["local_as"] = input_dict
[router
]["bgp"]["local_as"]
1197 logger
.info("Removing bgp configuration")
1198 create_router_bgp(tgen
, topo
, router_dict
)
1200 logger
.info("Applying modified bgp configuration")
1201 create_router_bgp(tgen
, new_topo
)
1203 except Exception as e
:
1204 # handle any exception
1205 logger
.error("Error %s occured. Arguments %s.", e
.message
, e
.args
)
1208 errormsg
= traceback
.format_exc()
1209 logger
.error(errormsg
)
1212 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1216 @retry(attempts
=3, wait
=2, return_is_str
=True)
1217 def verify_as_numbers(tgen
, topo
, input_dict
):
1219 This API is to verify AS numbers for given DUT by running
1220 "show ip bgp neighbor json" command. Local AS and Remote AS
1221 will ve verified with input_dict data and command output.
1225 * `tgen`: topogen object
1226 * `topo`: input json file data
1227 * `addr_type` : ip type, ipv4/ipv6
1228 * `input_dict`: defines - for which router, AS numbers needs to be verified
1239 result = verify_as_numbers(tgen, topo, addr_type, input_dict)
1243 errormsg(str) or True
1246 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1247 for router
in input_dict
.keys():
1248 if router
not in tgen
.routers():
1251 rnode
= tgen
.routers()[router
]
1253 logger
.info("Verifying AS numbers for dut %s:", router
)
1255 show_ip_bgp_neighbor_json
= run_frr_cmd(
1256 rnode
, "show ip bgp neighbor json", isjson
=True
1258 local_as
= input_dict
[router
]["bgp"]["local_as"]
1259 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1261 for addr_type
in bgp_addr_type
:
1262 if not check_address_types(addr_type
):
1265 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1267 for bgp_neighbor
, peer_data
in bgp_neighbors
.iteritems():
1268 remote_as
= input_dict
[bgp_neighbor
]["bgp"]["local_as"]
1269 for dest_link
, peer_dict
in peer_data
["dest_link"].iteritems():
1271 data
= topo
["routers"][bgp_neighbor
]["links"]
1273 if dest_link
in data
:
1274 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1275 neigh_data
= show_ip_bgp_neighbor_json
[neighbor_ip
]
1276 # Verify Local AS for router
1277 if neigh_data
["localAs"] != local_as
:
1279 "Failed: Verify local_as for dut {},"
1280 " found: {} but expected: {}".format(
1281 router
, neigh_data
["localAs"], local_as
1287 "Verified local_as for dut %s, found" " expected: %s",
1292 # Verify Remote AS for neighbor
1293 if neigh_data
["remoteAs"] != remote_as
:
1295 "Failed: Verify remote_as for dut "
1296 "{}'s neighbor {}, found: {} but "
1297 "expected: {}".format(
1298 router
, bgp_neighbor
, neigh_data
["remoteAs"], remote_as
1304 "Verified remote_as for dut %s's "
1305 "neighbor %s, found expected: %s",
1311 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1315 def clear_bgp(tgen
, addr_type
, router
, vrf
=None):
1317 This API is to clear bgp neighborship by running
1318 clear ip bgp */clear bgp ipv6 * command,
1322 * `tgen`: topogen object
1323 * `addr_type`: ip type ipv4/ipv6
1324 * `router`: device under test
1329 clear_bgp(tgen, addr_type, "r1")
1332 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1334 if router
not in tgen
.routers():
1337 rnode
= tgen
.routers()[router
]
1340 if type(vrf
) is not list:
1344 logger
.info("Clearing BGP neighborship for router %s..", router
)
1345 if addr_type
== "ipv4":
1348 run_frr_cmd(rnode
, "clear ip bgp vrf {} *".format(_vrf
))
1350 run_frr_cmd(rnode
, "clear ip bgp *")
1351 elif addr_type
== "ipv6":
1354 run_frr_cmd(rnode
, "clear bgp vrf {} ipv6 *".format(_vrf
))
1356 run_frr_cmd(rnode
, "clear bgp ipv6 *")
1358 run_frr_cmd(rnode
, "clear bgp *")
1362 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1365 def clear_bgp_and_verify(tgen
, topo
, router
):
1367 This API is to clear bgp neighborship and verify bgp neighborship
1368 is coming up(BGP is converged) usinf "show bgp summary json" command
1369 and also verifying for all bgp neighbors uptime before and after
1370 clear bgp sessions is different as the uptime must be changed once
1371 bgp sessions are cleared using "clear ip bgp */clear bgp ipv6 *" cmd.
1375 * `tgen`: topogen object
1376 * `topo`: input json file data
1377 * `router`: device under test
1381 result = clear_bgp_and_verify(tgen, topo, addr_type, dut)
1385 errormsg(str) or True
1388 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1390 if router
not in tgen
.routers():
1393 rnode
= tgen
.routers()[router
]
1395 peer_uptime_before_clear_bgp
= {}
1396 # Verifying BGP convergence before bgp clear command
1397 for retry
in range(44):
1399 # Waiting for BGP to converge
1401 "Waiting for %s sec for BGP to converge on router" " %s...",
1407 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
1408 # Verifying output dictionary show_bgp_json is empty or not
1409 if not bool(show_bgp_json
):
1410 errormsg
= "BGP is not running"
1413 # To find neighbor ip type
1414 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1416 for addr_type
in bgp_addr_type
.keys():
1418 if not check_address_types(addr_type
):
1421 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1423 for bgp_neighbor
in bgp_neighbors
:
1424 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
1427 for addr_type
in bgp_addr_type
:
1428 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1430 for bgp_neighbor
, peer_data
in bgp_neighbors
.iteritems():
1431 for dest_link
, peer_dict
in peer_data
["dest_link"].iteritems():
1432 data
= topo
["routers"][bgp_neighbor
]["links"]
1434 if dest_link
in data
:
1435 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1436 if addr_type
== "ipv4":
1437 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
1438 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1440 # Peer up time dictionary
1441 peer_uptime_before_clear_bgp
[bgp_neighbor
] = ipv4_data
[
1443 ]["peerUptimeEstablishedEpoch"]
1445 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
1446 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1448 # Peer up time dictionary
1449 peer_uptime_before_clear_bgp
[bgp_neighbor
] = ipv6_data
[
1451 ]["peerUptimeEstablishedEpoch"]
1453 if nh_state
== "Established":
1456 if no_of_peer
== total_peer
:
1457 logger
.info("BGP is Converged for router %s before bgp" " clear", router
)
1461 "BGP is not yet Converged for router %s " "before bgp clear", router
1465 "TIMEOUT!! BGP is not converged in 30 seconds for"
1466 " router {}".format(router
)
1471 logger
.info("Clearing BGP neighborship for router %s..", router
)
1472 for addr_type
in bgp_addr_type
.keys():
1473 if addr_type
== "ipv4":
1474 run_frr_cmd(rnode
, "clear ip bgp *")
1475 elif addr_type
== "ipv6":
1476 run_frr_cmd(rnode
, "clear bgp ipv6 *")
1478 peer_uptime_after_clear_bgp
= {}
1479 # Verifying BGP convergence after bgp clear command
1480 for retry
in range(44):
1482 # Waiting for BGP to converge
1484 "Waiting for %s sec for BGP to converge on router" " %s...",
1490 show_bgp_json
= run_frr_cmd(rnode
, "show bgp summary json", isjson
=True)
1491 # Verifying output dictionary show_bgp_json is empty or not
1492 if not bool(show_bgp_json
):
1493 errormsg
= "BGP is not running"
1496 # To find neighbor ip type
1497 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
1499 for addr_type
in bgp_addr_type
.keys():
1500 if not check_address_types(addr_type
):
1503 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1505 for bgp_neighbor
in bgp_neighbors
:
1506 total_peer
+= len(bgp_neighbors
[bgp_neighbor
]["dest_link"])
1509 for addr_type
in bgp_addr_type
:
1510 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1512 for bgp_neighbor
, peer_data
in bgp_neighbors
.iteritems():
1513 for dest_link
, peer_dict
in peer_data
["dest_link"].iteritems():
1514 data
= topo
["routers"][bgp_neighbor
]["links"]
1516 if dest_link
in data
:
1517 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1518 if addr_type
== "ipv4":
1519 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
1520 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1521 peer_uptime_after_clear_bgp
[bgp_neighbor
] = ipv4_data
[
1523 ]["peerUptimeEstablishedEpoch"]
1525 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
1526 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1527 # Peer up time dictionary
1528 peer_uptime_after_clear_bgp
[bgp_neighbor
] = ipv6_data
[
1530 ]["peerUptimeEstablishedEpoch"]
1532 if nh_state
== "Established":
1535 if no_of_peer
== total_peer
:
1536 logger
.info("BGP is Converged for router %s after bgp clear", router
)
1540 "BGP is not yet Converged for router %s after" " bgp clear", router
1544 "TIMEOUT!! BGP is not converged in 30 seconds for"
1545 " router {}".format(router
)
1549 # Comparing peerUptimeEstablishedEpoch dictionaries
1550 if peer_uptime_before_clear_bgp
!= peer_uptime_after_clear_bgp
:
1551 logger
.info("BGP neighborship is reset after clear BGP on router %s", router
)
1554 "BGP neighborship is not reset after clear bgp on router"
1555 " {}".format(router
)
1559 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1563 def verify_bgp_timers_and_functionality(tgen
, topo
, input_dict
):
1565 To verify BGP timer config, execute "show ip bgp neighbor json" command
1566 and verify bgp timers with input_dict data.
1567 To veirfy bgp timers functonality, shutting down peer interface
1568 and verify BGP neighborship status.
1572 * `tgen`: topogen object
1573 * `topo`: input json file data
1574 * `addr_type`: ip type, ipv4/ipv6
1575 * `input_dict`: defines for which router, bgp timers needs to be verified
1578 # To verify BGP timers for neighbor r2 of router r1
1584 "keepalivetimer": 5,
1585 "holddowntimer": 15,
1587 result = verify_bgp_timers_and_functionality(tgen, topo, "ipv4",
1592 errormsg(str) or True
1595 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1597 router_list
= tgen
.routers()
1598 for router
in input_dict
.keys():
1599 if router
not in router_list
:
1602 rnode
= router_list
[router
]
1604 logger
.info("Verifying bgp timers functionality, DUT is %s:", router
)
1606 show_ip_bgp_neighbor_json
= run_frr_cmd(
1607 rnode
, "show ip bgp neighbor json", isjson
=True
1610 bgp_addr_type
= input_dict
[router
]["bgp"]["address_family"]
1612 for addr_type
in bgp_addr_type
:
1613 if not check_address_types(addr_type
):
1616 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
1617 for bgp_neighbor
, peer_data
in bgp_neighbors
.iteritems():
1618 for dest_link
, peer_dict
in peer_data
["dest_link"].iteritems():
1619 data
= topo
["routers"][bgp_neighbor
]["links"]
1621 keepalivetimer
= peer_dict
["keepalivetimer"]
1622 holddowntimer
= peer_dict
["holddowntimer"]
1624 if dest_link
in data
:
1625 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
1626 neighbor_intf
= data
[dest_link
]["interface"]
1628 # Verify HoldDownTimer for neighbor
1629 bgpHoldTimeMsecs
= show_ip_bgp_neighbor_json
[neighbor_ip
][
1630 "bgpTimerHoldTimeMsecs"
1632 if bgpHoldTimeMsecs
!= holddowntimer
* 1000:
1634 "Verifying holddowntimer for bgp "
1635 "neighbor {} under dut {}, found: {} "
1636 "but expected: {}".format(
1640 holddowntimer
* 1000,
1645 # Verify KeepAliveTimer for neighbor
1646 bgpKeepAliveTimeMsecs
= show_ip_bgp_neighbor_json
[neighbor_ip
][
1647 "bgpTimerKeepAliveIntervalMsecs"
1649 if bgpKeepAliveTimeMsecs
!= keepalivetimer
* 1000:
1651 "Verifying keepalivetimer for bgp "
1652 "neighbor {} under dut {}, found: {} "
1653 "but expected: {}".format(
1656 bgpKeepAliveTimeMsecs
,
1657 keepalivetimer
* 1000,
1662 ####################
1663 # Shutting down peer interface after keepalive time and
1664 # after some time bringing up peer interface.
1665 # verifying BGP neighborship in (hold down-keep alive)
1666 # time, it should not go down
1667 ####################
1669 # Wait till keep alive time
1670 logger
.info("=" * 20)
1671 logger
.info("Scenario 1:")
1673 "Shutdown and bring up peer interface: %s "
1674 "in keep alive time : %s sec and verify "
1675 " BGP neighborship is intact in %s sec ",
1678 (holddowntimer
- keepalivetimer
),
1680 logger
.info("=" * 20)
1681 logger
.info("Waiting for %s sec..", keepalivetimer
)
1682 sleep(keepalivetimer
)
1684 # Shutting down peer ineterface
1686 "Shutting down interface %s on router %s",
1690 topotest
.interface_set_status(
1691 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=False
1694 # Bringing up peer interface
1697 "Bringing up interface %s on router %s..",
1701 topotest
.interface_set_status(
1702 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=True
1705 # Verifying BGP neighborship is intact in
1706 # (holddown - keepalive) time
1708 keepalivetimer
, holddowntimer
, int(holddowntimer
/ 3)
1710 logger
.info("Waiting for %s sec..", keepalivetimer
)
1711 sleep(keepalivetimer
)
1713 show_bgp_json
= run_frr_cmd(
1714 rnode
, "show bgp summary json", isjson
=True
1717 if addr_type
== "ipv4":
1718 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
1719 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1721 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
1722 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1724 if timer
== (holddowntimer
- keepalivetimer
):
1725 if nh_state
!= "Established":
1727 "BGP neighborship has not gone "
1728 "down in {} sec for neighbor {}".format(
1735 "BGP neighborship is intact in %s"
1736 " sec for neighbor %s",
1741 ####################
1742 # Shutting down peer interface and verifying that BGP
1743 # neighborship is going down in holddown time
1744 ####################
1745 logger
.info("=" * 20)
1746 logger
.info("Scenario 2:")
1748 "Shutdown peer interface: %s and verify BGP"
1749 " neighborship has gone down in hold down "
1754 logger
.info("=" * 20)
1757 "Shutting down interface %s on router %s..",
1761 topotest
.interface_set_status(
1762 router_list
[bgp_neighbor
], neighbor_intf
, ifaceaction
=False
1765 # Verifying BGP neighborship is going down in holddown time
1768 (holddowntimer
+ keepalivetimer
),
1769 int(holddowntimer
/ 3),
1771 logger
.info("Waiting for %s sec..", keepalivetimer
)
1772 sleep(keepalivetimer
)
1774 show_bgp_json
= run_frr_cmd(
1775 rnode
, "show bgp summary json", isjson
=True
1778 if addr_type
== "ipv4":
1779 ipv4_data
= show_bgp_json
["ipv4Unicast"]["peers"]
1780 nh_state
= ipv4_data
[neighbor_ip
]["state"]
1782 ipv6_data
= show_bgp_json
["ipv6Unicast"]["peers"]
1783 nh_state
= ipv6_data
[neighbor_ip
]["state"]
1785 if timer
== holddowntimer
:
1786 if nh_state
== "Established":
1788 "BGP neighborship has not gone "
1789 "down in {} sec for neighbor {}".format(
1796 "BGP neighborship has gone down in"
1797 " %s sec for neighbor %s",
1802 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1806 @retry(attempts
=3, wait
=4, return_is_str
=True)
1807 def verify_bgp_attributes(
1808 tgen
, addr_type
, dut
, static_routes
, rmap_name
, input_dict
, seq_id
=None
1811 API will verify BGP attributes set by Route-map for given prefix and
1812 DUT. it will run "show bgp ipv4/ipv6 {prefix_address} json" command
1813 in DUT to verify BGP attributes set by route-map, Set attributes
1814 values will be read from input_dict and verified with command output.
1816 * `tgen`: topogen object
1817 * `addr_type` : ip type, ipv4/ipv6
1818 * `dut`: Device Under Test
1819 * `static_routes`: Static Routes for which BGP set attributes needs to be
1821 * `rmap_name`: route map name for which set criteria needs to be verified
1822 * `input_dict`: defines for which router, AS numbers needs
1823 * `seq_id`: sequence number of rmap, default is None
1830 "rmap_match_pf_1_ipv4": [{
1835 "prefix_lists": "pf_list_1_" + addr_type
1843 "rmap_match_pf_2_ipv6": [{
1848 "prefix_lists": "pf_list_1_" + addr_type
1858 result = verify_bgp_attributes(tgen, 'ipv4', "r1", "10.0.20.1/32",
1859 rmap_match_pf_1_ipv4, input_dict)
1863 errormsg(str) or True
1866 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1867 for router
, rnode
in tgen
.routers().iteritems():
1871 logger
.info("Verifying BGP set attributes for dut {}:".format(router
))
1873 for static_route
in static_routes
:
1874 cmd
= "show bgp {} {} json".format(addr_type
, static_route
)
1875 show_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
1879 for rmap_router
in input_dict
.keys():
1880 for rmap
, values
in input_dict
[rmap_router
]["route_maps"].items():
1881 if rmap
== rmap_name
:
1882 dict_to_test
= values
1883 for rmap_dict
in values
:
1884 if seq_id
is not None:
1885 if type(seq_id
) is not list:
1888 if "seq_id" in rmap_dict
:
1889 rmap_seq_id
= rmap_dict
["seq_id"]
1890 for _seq_id
in seq_id
:
1891 if _seq_id
== rmap_seq_id
:
1892 tmp_list
.append(rmap_dict
)
1894 dict_to_test
= tmp_list
1896 for rmap_dict
in dict_to_test
:
1897 if "set" in rmap_dict
:
1898 for criteria
in rmap_dict
["set"].keys():
1899 if criteria
not in show_bgp_json
["paths"][0]:
1904 "in router {}".format(criteria
, cmd
, router
)
1909 rmap_dict
["set"][criteria
]
1910 == show_bgp_json
["paths"][0][criteria
]
1922 rmap_dict
["set"][criteria
],
1927 "Failed: Verifying BGP "
1928 "attribute {} for route:"
1929 " {} in router: {}, "
1930 " expected value: {} but"
1931 " found: {}".format(
1935 rmap_dict
["set"][criteria
],
1936 show_bgp_json
["paths"][0][criteria
],
1941 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1945 @retry(attempts
=4, wait
=2, return_is_str
=True, initial_wait
=2)
1946 def verify_best_path_as_per_bgp_attribute(
1947 tgen
, addr_type
, router
, input_dict
, attribute
1950 API is to verify best path according to BGP attributes for given routes.
1951 "show bgp ipv4/6 json" command will be run and verify best path according
1952 to shortest as-path, highest local-preference and med, lowest weight and
1953 route origin IGP>EGP>INCOMPLETE.
1956 * `tgen` : topogen object
1957 * `addr_type` : ip type, ipv4/ipv6
1958 * `tgen` : topogen object
1959 * `attribute` : calculate best path using this attribute
1960 * `input_dict`: defines different routes to calculate for which route
1961 best path is selected
1964 # To verify best path for routes 200.50.2.0/32 and 200.60.2.0/32 from
1965 router r7 to router r1(DUT) as per shortest as-path attribute
1972 "advertise_networks": [
1974 "network": "200.50.2.0/32"
1977 "network": "200.60.2.0/32"
1986 attribute = "locPrf"
1987 result = verify_best_path_as_per_bgp_attribute(tgen, "ipv4", dut, \
1988 input_dict, attribute)
1991 errormsg(str) or True
1994 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
1996 if router
not in tgen
.routers():
1999 rnode
= tgen
.routers()[router
]
2001 # Verifying show bgp json
2002 command
= "show bgp"
2005 logger
.info("Verifying router %s RIB for best path:", router
)
2007 static_route
= False
2008 advertise_network
= False
2009 for route_val
in input_dict
.values():
2010 if "static_routes" in route_val
:
2012 networks
= route_val
["static_routes"]
2014 advertise_network
= True
2015 net_data
= route_val
["bgp"]["address_family"][addr_type
]["unicast"]
2016 networks
= net_data
["advertise_networks"]
2018 for network
in networks
:
2019 _network
= network
["network"]
2020 no_of_ip
= network
.setdefault("no_of_ip", 1)
2021 vrf
= network
.setdefault("vrf", None)
2024 cmd
= "{} vrf {}".format(command
, vrf
)
2028 cmd
= "{} {}".format(cmd
, addr_type
)
2029 cmd
= "{} json".format(cmd
)
2030 sh_ip_bgp_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2032 routes
= generate_ips(_network
, no_of_ip
)
2033 for route
in routes
:
2034 route
= str(ipaddr
.IPNetwork(unicode(route
)))
2036 if route
in sh_ip_bgp_json
["routes"]:
2037 route_attributes
= sh_ip_bgp_json
["routes"][route
]
2041 for route_attribute
in route_attributes
:
2042 next_hops
= route_attribute
["nexthops"]
2043 for next_hop
in next_hops
:
2044 next_hop_ip
= next_hop
["ip"]
2045 attribute_dict
[next_hop_ip
] = route_attribute
[attribute
]
2048 if attribute
== "path":
2049 # Find next_hop for the route have minimum as_path
2051 attribute_dict
, key
=lambda x
: len(set(attribute_dict
[x
]))
2053 compare
= "SHORTEST"
2055 # LOCAL_PREF attribute
2056 elif attribute
== "locPrf":
2057 # Find next_hop for the route have highest local preference
2059 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2064 elif attribute
== "weight":
2065 # Find next_hop for the route have highest weight
2067 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2072 elif attribute
== "origin":
2073 # Find next_hop for the route have IGP as origin, -
2074 # - rule is IGP>EGP>INCOMPLETE
2077 for (key
, value
) in attribute_dict
.iteritems()
2083 elif attribute
== "metric":
2084 # Find next_hop for the route have LOWEST MED
2086 attribute_dict
, key
=(lambda k
: attribute_dict
[k
])
2091 if addr_type
== "ipv4":
2092 command_1
= "show ip route"
2094 command_1
= "show ipv6 route"
2097 cmd
= "{} vrf {} json".format(command_1
, vrf
)
2099 cmd
= "{} json".format(command_1
)
2101 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2103 # Verifying output dictionary rib_routes_json is not empty
2104 if not bool(rib_routes_json
):
2105 errormsg
= "No route found in RIB of router {}..".format(router
)
2110 # Find best is installed in RIB
2111 if route
in rib_routes_json
:
2113 # Verify next_hop in rib_routes_json
2115 rib_routes_json
[route
][0]["nexthops"][0]["ip"]
2121 "Incorrect Nexthop for BGP route {} in "
2122 "RIB of router {}, Expected: {}, Found:"
2126 rib_routes_json
[route
][0]["nexthops"][0]["ip"],
2132 if st_found
and nh_found
:
2134 "Best path for prefix: %s with next_hop: %s is "
2135 "installed according to %s %s: (%s) in RIB of "
2141 attribute_dict
[_next_hop
],
2145 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2149 def verify_best_path_as_per_admin_distance(
2150 tgen
, addr_type
, router
, input_dict
, attribute
2153 API is to verify best path according to admin distance for given
2154 route. "show ip/ipv6 route json" command will be run and verify
2155 best path accoring to shortest admin distanc.
2159 * `addr_type` : ip type, ipv4/ipv6
2160 * `dut`: Device Under Test
2161 * `tgen` : topogen object
2162 * `attribute` : calculate best path using admin distance
2163 * `input_dict`: defines different routes with different admin distance
2164 to calculate for which route best path is selected
2167 # To verify best path for route 200.50.2.0/32 from router r2 to
2168 router r1(DUT) as per shortest admin distance which is 60.
2171 "static_routes": [{"network": "200.50.2.0/32", \
2172 "admin_distance": 80, "next_hop": "10.0.0.14"},
2173 {"network": "200.50.2.0/32", \
2174 "admin_distance": 60, "next_hop": "10.0.0.18"}]
2176 attribute = "locPrf"
2177 result = verify_best_path_as_per_admin_distance(tgen, "ipv4", dut, \
2178 input_dict, attribute):
2181 errormsg(str) or True
2184 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2185 router_list
= tgen
.routers()
2186 if router
not in router_list
:
2189 rnode
= tgen
.routers()[router
]
2192 logger
.info("Verifying router %s RIB for best path:", router
)
2195 if addr_type
== "ipv4":
2196 command
= "show ip route json"
2198 command
= "show ipv6 route json"
2200 for routes_from_router
in input_dict
.keys():
2201 sh_ip_route_json
= router_list
[routes_from_router
].vtysh_cmd(
2202 command
, isjson
=True
2204 networks
= input_dict
[routes_from_router
]["static_routes"]
2205 for network
in networks
:
2206 route
= network
["network"]
2208 route_attributes
= sh_ip_route_json
[route
]
2212 for route_attribute
in route_attributes
:
2213 next_hops
= route_attribute
["nexthops"]
2214 for next_hop
in next_hops
:
2215 next_hop_ip
= next_hop
["ip"]
2216 attribute_dict
[next_hop_ip
] = route_attribute
["distance"]
2218 # Find next_hop for the route have LOWEST Admin Distance
2219 _next_hop
= min(attribute_dict
, key
=(lambda k
: attribute_dict
[k
]))
2223 rib_routes_json
= run_frr_cmd(rnode
, command
, isjson
=True)
2225 # Verifying output dictionary rib_routes_json is not empty
2226 if not bool(rib_routes_json
):
2227 errormsg
= "No route found in RIB of router {}..".format(router
)
2232 # Find best is installed in RIB
2233 if route
in rib_routes_json
:
2235 # Verify next_hop in rib_routes_json
2236 if rib_routes_json
[route
][0]["nexthops"][0]["ip"] == _next_hop
:
2240 "Nexthop {} is Missing for BGP route {}"
2241 " in RIB of router {}\n".format(_next_hop
, route
, router
)
2245 if st_found
and nh_found
:
2247 "Best path for prefix: %s is installed according"
2248 " to %s %s: (%s) in RIB of router %s",
2252 attribute_dict
[_next_hop
],
2256 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2260 @retry(attempts
=5, wait
=2, return_is_str
=True, initial_wait
=2)
2261 def verify_bgp_rib(tgen
, addr_type
, dut
, input_dict
, next_hop
=None, aspath
=None):
2263 This API is to verify whether bgp rib has any
2264 matching route for a nexthop.
2268 * `tgen`: topogen object
2269 * `dut`: input dut router name
2270 * `addr_type` : ip type ipv4/ipv6
2271 * `input_dict` : input dict, has details of static routes
2272 * `next_hop`[optional]: next_hop which needs to be verified,
2274 * 'aspath'[optional]: aspath which needs to be verified
2279 next_hop = "192.168.1.10"
2280 input_dict = topo['routers']
2281 aspath = "100 200 300"
2282 result = verify_bgp_rib(tgen, addr_type, dut, tgen, input_dict,
2287 errormsg(str) or True
2290 logger
.debug("Entering lib API: verify_bgp_rib()")
2292 router_list
= tgen
.routers()
2293 additional_nexthops_in_required_nhs
= []
2296 for routerInput
in input_dict
.keys():
2297 for router
, rnode
in router_list
.iteritems():
2301 # Verifying RIB routes
2302 command
= "show bgp"
2306 logger
.info("Checking router {} BGP RIB:".format(dut
))
2308 if "static_routes" in input_dict
[routerInput
]:
2309 static_routes
= input_dict
[routerInput
]["static_routes"]
2311 for static_route
in static_routes
:
2316 vrf
= static_route
.setdefault("vrf", None)
2318 cmd
= "{} vrf {} {}".format(command
, vrf
, addr_type
)
2321 cmd
= "{} {}".format(command
, addr_type
)
2323 cmd
= "{} json".format(cmd
)
2325 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2327 # Verifying output dictionary rib_routes_json is not empty
2328 if bool(rib_routes_json
) == False:
2329 errormsg
= "No route found in rib of router {}..".format(router
)
2332 network
= static_route
["network"]
2334 if "no_of_ip" in static_route
:
2335 no_of_ip
= static_route
["no_of_ip"]
2339 # Generating IPs for verification
2340 ip_list
= generate_ips(network
, no_of_ip
)
2342 for st_rt
in ip_list
:
2343 st_rt
= str(ipaddr
.IPNetwork(unicode(st_rt
)))
2345 _addr_type
= validate_ip_address(st_rt
)
2346 if _addr_type
!= addr_type
:
2349 if st_rt
in rib_routes_json
["routes"]:
2351 found_routes
.append(st_rt
)
2354 if not isinstance(next_hop
, list):
2355 next_hop
= [next_hop
]
2359 for rib_r
in rib_routes_json
["routes"][st_rt
][0][
2364 missing_list_of_nexthops
= set(list2
).difference(list1
)
2365 additional_nexthops_in_required_nhs
= set(
2370 if additional_nexthops_in_required_nhs
:
2372 "Missing nexthop %s for route"
2373 " %s in RIB of router %s\n",
2374 additional_nexthops_in_required_nhs
,
2379 "Nexthop {} is Missing for "
2380 "route {} in RIB of router {}\n".format(
2381 additional_nexthops_in_required_nhs
,
2390 found_paths
= rib_routes_json
["routes"][st_rt
][0][
2393 if aspath
== found_paths
:
2396 "Found AS path {} for route"
2397 " {} in RIB of router "
2398 "{}\n".format(aspath
, st_rt
, dut
)
2402 "AS Path {} is missing for route"
2403 "for route {} in RIB of router {}\n".format(
2410 missing_routes
.append(st_rt
)
2414 "Found next_hop {} for all bgp"
2416 " router {}\n".format(next_hop
, router
)
2419 if len(missing_routes
) > 0:
2421 "Missing route in RIB of router {}, "
2422 "routes: {}\n".format(dut
, missing_routes
)
2428 "Verified routes in router {} BGP RIB, "
2429 "found routes are: {} \n".format(dut
, found_routes
)
2433 if "bgp" not in input_dict
[routerInput
]:
2436 # Advertise networks
2437 bgp_data_list
= input_dict
[routerInput
]["bgp"]
2439 if type(bgp_data_list
) is not list:
2440 bgp_data_list
= [bgp_data_list
]
2442 for bgp_data
in bgp_data_list
:
2443 vrf_id
= bgp_data
.setdefault("vrf", None)
2445 cmd
= "{} vrf {} {}".format(command
, vrf_id
, addr_type
)
2447 cmd
= "{} {}".format(command
, addr_type
)
2449 cmd
= "{} json".format(cmd
)
2451 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
2453 # Verifying output dictionary rib_routes_json is not empty
2454 if bool(rib_routes_json
) == False:
2455 errormsg
= "No route found in rib of router {}..".format(router
)
2458 bgp_net_advertise
= bgp_data
["address_family"][addr_type
]["unicast"]
2459 advertise_network
= bgp_net_advertise
.setdefault(
2460 "advertise_networks", []
2463 for advertise_network_dict
in advertise_network
:
2468 network
= advertise_network_dict
["network"]
2470 if "no_of_network" in advertise_network_dict
:
2471 no_of_network
= advertise_network_dict
["no_of_network"]
2475 # Generating IPs for verification
2476 ip_list
= generate_ips(network
, no_of_network
)
2478 for st_rt
in ip_list
:
2479 st_rt
= str(ipaddr
.IPNetwork(unicode(st_rt
)))
2481 _addr_type
= validate_ip_address(st_rt
)
2482 if _addr_type
!= addr_type
:
2485 if st_rt
in rib_routes_json
["routes"]:
2487 found_routes
.append(st_rt
)
2490 missing_routes
.append(st_rt
)
2492 if len(missing_routes
) > 0:
2494 "Missing route in BGP RIB of router {},"
2495 " are: {}\n".format(dut
, missing_routes
)
2501 "Verified routes in router {} BGP RIB, found "
2502 "routes are: {}\n".format(dut
, found_routes
)
2505 logger
.debug("Exiting lib API: verify_bgp_rib()")
2509 @retry(attempts
=4, wait
=2, return_is_str
=True, initial_wait
=2)
2510 def verify_graceful_restart(tgen
, topo
, addr_type
, input_dict
, dut
, peer
):
2512 This API is to verify verify_graceful_restart configuration of DUT and
2513 cross verify the same from the peer bgp routerrouter.
2517 * `tgen`: topogen object
2518 * `topo`: input json file data
2519 * `addr_type` : ip type ipv4/ipv6
2520 * `input_dict`: input dictionary, have details of Device Under Test, for
2521 which user wants to test the data
2522 * `dut`: input dut router name
2523 * `peer`: input peer router name
2536 "graceful-restart": True
2549 "graceful-restart": True
2561 result = verify_graceful_restart(tgen, topo, addr_type, input_dict,
2562 dut = "r1", peer = 'r2')
2565 errormsg(str) or True
2568 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2570 for router
, rnode
in tgen
.routers().iteritems():
2574 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
2576 if addr_type
in bgp_addr_type
:
2577 if not check_address_types(addr_type
):
2580 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2582 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2583 if bgp_neighbor
!= peer
:
2586 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2587 data
= topo
["routers"][bgp_neighbor
]["links"]
2589 if dest_link
in data
:
2590 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2593 "[DUT: {}]: Checking bgp graceful-restart show"
2594 " o/p {}".format(dut
, neighbor_ip
)
2597 show_bgp_graceful_json
= None
2599 show_bgp_graceful_json
= run_frr_cmd(
2601 "show bgp {} neighbor {} graceful-restart json".format(
2602 addr_type
, neighbor_ip
2607 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
2609 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
2611 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
2614 errormsg
= "[DUT: {}]: Neighbor ip NOT a matched {}".format(
2622 if "address_family" in input_dict
[dut
]["bgp"]:
2623 bgp_neighbors
= input_dict
[dut
]["bgp"]["address_family"][addr_type
][
2625 ]["neighbor"][peer
]["dest_link"]
2627 for dest_link
, data
in bgp_neighbors
.items():
2629 "graceful-restart-helper" in data
2630 and data
["graceful-restart-helper"]
2633 elif "graceful-restart" in data
and data
["graceful-restart"]:
2636 "graceful-restart-disable" in data
2637 and data
["graceful-restart-disable"]
2644 if "graceful-restart" in input_dict
[dut
]["bgp"]:
2647 "graceful-restart" in input_dict
[dut
]["bgp"]["graceful-restart"]
2648 and input_dict
[dut
]["bgp"]["graceful-restart"][
2654 "graceful-restart-disable"
2655 in input_dict
[dut
]["bgp"]["graceful-restart"]
2656 and input_dict
[dut
]["bgp"]["graceful-restart"][
2657 "graceful-restart-disable"
2666 if lmode
== "Disable" or lmode
== "Disable*":
2670 if "address_family" in input_dict
[peer
]["bgp"]:
2671 bgp_neighbors
= input_dict
[peer
]["bgp"]["address_family"][addr_type
][
2673 ]["neighbor"][dut
]["dest_link"]
2675 for dest_link
, data
in bgp_neighbors
.items():
2677 "graceful-restart-helper" in data
2678 and data
["graceful-restart-helper"]
2681 elif "graceful-restart" in data
and data
["graceful-restart"]:
2684 "graceful-restart-disable" in data
2685 and data
["graceful-restart-disable"]
2692 if "graceful-restart" in input_dict
[peer
]["bgp"]:
2696 in input_dict
[peer
]["bgp"]["graceful-restart"]
2697 and input_dict
[peer
]["bgp"]["graceful-restart"][
2703 "graceful-restart-disable"
2704 in input_dict
[peer
]["bgp"]["graceful-restart"]
2705 and input_dict
[peer
]["bgp"]["graceful-restart"][
2706 "graceful-restart-disable"
2715 if show_bgp_graceful_json_out
["localGrMode"] == lmode
:
2717 "[DUT: {}]: localGrMode : {} ".format(
2718 dut
, show_bgp_graceful_json_out
["localGrMode"]
2723 "[DUT: {}]: localGrMode is not correct"
2724 " Expected: {}, Found: {}".format(
2725 dut
, lmode
, show_bgp_graceful_json_out
["localGrMode"]
2730 if show_bgp_graceful_json_out
["remoteGrMode"] == rmode
:
2732 "[DUT: {}]: remoteGrMode : {} ".format(
2733 dut
, show_bgp_graceful_json_out
["remoteGrMode"]
2737 show_bgp_graceful_json_out
["remoteGrMode"] == "NotApplicable"
2738 and rmode
== "Disable"
2741 "[DUT: {}]: remoteGrMode : {} ".format(
2742 dut
, show_bgp_graceful_json_out
["remoteGrMode"]
2747 "[DUT: {}]: remoteGrMode is not correct"
2748 " Expected: {}, Found: {}".format(
2749 dut
, rmode
, show_bgp_graceful_json_out
["remoteGrMode"]
2754 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2758 @retry(attempts
=4, wait
=2, return_is_str
=True, initial_wait
=2)
2759 def verify_r_bit(tgen
, topo
, addr_type
, input_dict
, dut
, peer
):
2761 This API is to verify r_bit in the BGP gr capability advertised
2762 by the neighbor router
2766 * `tgen`: topogen object
2767 * `topo`: input json file data
2768 * `addr_type` : ip type ipv4/ipv6
2769 * `input_dict`: input dictionary, have details of Device Under Test, for
2770 which user wants to test the data
2771 * `dut`: input dut router name
2785 "graceful-restart": True
2798 "graceful-restart": True
2809 result = verify_r_bit(tgen, topo, addr_type, input_dict, dut, peer)
2813 errormsg(str) or True
2816 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2818 for router
, rnode
in tgen
.routers().iteritems():
2822 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
2824 if addr_type
in bgp_addr_type
:
2825 if not check_address_types(addr_type
):
2828 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2830 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2831 if bgp_neighbor
!= peer
:
2834 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2835 data
= topo
["routers"][bgp_neighbor
]["links"]
2837 if dest_link
in data
:
2838 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2841 "[DUT: {}]: Checking bgp graceful-restart show"
2842 " o/p {}".format(dut
, neighbor_ip
)
2845 show_bgp_graceful_json
= run_frr_cmd(
2847 "show bgp {} neighbor {} graceful-restart json".format(
2848 addr_type
, neighbor_ip
2853 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
2855 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
2857 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
2860 errormsg
= "[DUT: {}]: Neighbor ip NOT a matched {}".format(
2865 if "rBit" in show_bgp_graceful_json_out
:
2866 if show_bgp_graceful_json_out
["rBit"]:
2867 logger
.info("[DUT: {}]: Rbit true {}".format(dut
, neighbor_ip
))
2869 errormsg
= "[DUT: {}]: Rbit false {}".format(dut
, neighbor_ip
)
2872 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2876 @retry(attempts
=4, wait
=2, return_is_str
=True, initial_wait
=2)
2877 def verify_eor(tgen
, topo
, addr_type
, input_dict
, dut
, peer
):
2879 This API is to verify EOR
2883 * `tgen`: topogen object
2884 * `topo`: input json file data
2885 * `addr_type` : ip type ipv4/ipv6
2886 * `input_dict`: input dictionary, have details of DUT, for
2887 which user wants to test the data
2888 * `dut`: input dut router name
2903 "graceful-restart": True
2916 "graceful-restart": True
2928 result = verify_eor(tgen, topo, addr_type, input_dict, dut, peer)
2932 errormsg(str) or True
2934 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
2936 for router
, rnode
in tgen
.routers().iteritems():
2940 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
2942 if addr_type
in bgp_addr_type
:
2943 if not check_address_types(addr_type
):
2946 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
2948 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
2949 if bgp_neighbor
!= peer
:
2952 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
2953 data
= topo
["routers"][bgp_neighbor
]["links"]
2955 if dest_link
in data
:
2956 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
2959 "[DUT: %s]: Checking bgp graceful-restart" " show o/p %s",
2964 show_bgp_graceful_json
= run_frr_cmd(
2966 "show bgp {} neighbor {} graceful-restart json".format(
2967 addr_type
, neighbor_ip
2972 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
2974 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
2975 logger
.info("[DUT: %s]: Neighbor ip matched %s", dut
, neighbor_ip
)
2977 errormsg
= "[DUT: %s]: Neighbor ip is NOT matched %s" % (
2983 if addr_type
== "ipv4":
2985 elif addr_type
== "ipv6":
2988 errormsg
= "Address type %s is not supported" % (addr_type
)
2991 eor_json
= show_bgp_graceful_json_out
[afi
]["endOfRibStatus"]
2992 if "endOfRibSend" in eor_json
:
2994 if eor_json
["endOfRibSend"]:
2996 "[DUT: %s]: EOR Send true for %s " "%s", dut
, neighbor_ip
, afi
2999 errormsg
= "[DUT: %s]: EOR Send false for %s" " %s" % (
3006 if "endOfRibRecv" in eor_json
:
3007 if eor_json
["endOfRibRecv"]:
3009 "[DUT: %s]: EOR Recv true %s " "%s", dut
, neighbor_ip
, afi
3012 errormsg
= "[DUT: %s]: EOR Recv false %s " "%s" % (
3019 if "endOfRibSentAfterUpdate" in eor_json
:
3020 if eor_json
["endOfRibSentAfterUpdate"]:
3022 "[DUT: %s]: EOR SendTime true for %s" " %s",
3028 errormsg
= "[DUT: %s]: EOR SendTime false for " "%s %s" % (
3035 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3039 @retry(attempts
=4, wait
=2, return_is_str
=True, initial_wait
=2)
3040 def verify_f_bit(tgen
, topo
, addr_type
, input_dict
, dut
, peer
):
3042 This API is to verify f_bit in the BGP gr capability advertised
3043 by the neighbor router
3047 * `tgen`: topogen object
3048 * `topo`: input json file data
3049 * `addr_type` : ip type ipv4/ipv6
3050 * `input_dict`: input dictionary, have details of Device Under Test, for
3051 which user wants to test the data
3052 * `dut`: input dut router name
3067 "graceful-restart": True
3080 "graceful-restart": True
3092 result = verify_f_bit(tgen, topo, 'ipv4', input_dict, dut, peer)
3096 errormsg(str) or True
3099 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3101 for router
, rnode
in tgen
.routers().iteritems():
3105 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3107 if addr_type
in bgp_addr_type
:
3108 if not check_address_types(addr_type
):
3111 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3113 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3114 if bgp_neighbor
!= peer
:
3117 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3118 data
= topo
["routers"][bgp_neighbor
]["links"]
3120 if dest_link
in data
:
3121 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3124 "[DUT: {}]: Checking bgp graceful-restart show"
3125 " o/p {}".format(dut
, neighbor_ip
)
3128 show_bgp_graceful_json
= run_frr_cmd(
3130 "show bgp {} neighbor {} graceful-restart json".format(
3131 addr_type
, neighbor_ip
3136 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3138 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3140 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3143 errormsg
= "[DUT: {}]: Neighbor ip NOT a match {}".format(
3148 if "ipv4Unicast" in show_bgp_graceful_json_out
:
3149 if show_bgp_graceful_json_out
["ipv4Unicast"]["fBit"]:
3151 "[DUT: {}]: Fbit True for {} IPv4"
3152 " Unicast".format(dut
, neighbor_ip
)
3155 errormsg
= "[DUT: {}]: Fbit False for {} IPv4" " Unicast".format(
3160 elif "ipv6Unicast" in show_bgp_graceful_json_out
:
3161 if show_bgp_graceful_json_out
["ipv6Unicast"]["fBit"]:
3163 "[DUT: {}]: Fbit True for {} IPv6"
3164 " Unicast".format(dut
, neighbor_ip
)
3167 errormsg
= "[DUT: {}]: Fbit False for {} IPv6" " Unicast".format(
3172 show_bgp_graceful_json_out
["ipv4Unicast"]
3173 show_bgp_graceful_json_out
["ipv6Unicast"]
3175 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3179 @retry(attempts
=4, wait
=2, return_is_str
=True, initial_wait
=2)
3180 def verify_graceful_restart_timers(tgen
, topo
, addr_type
, input_dict
, dut
, peer
):
3182 This API is to verify graceful restart timers, configured and recieved
3186 * `tgen`: topogen object
3187 * `topo`: input json file data
3188 * `addr_type` : ip type ipv4/ipv6
3189 * `input_dict`: input dictionary, have details of Device Under Test,
3190 for which user wants to test the data
3191 * `dut`: input dut router name
3195 # Configure graceful-restart
3201 "graceful-restart": "graceful-restart-helper"
3204 "gracefulrestart": ["restart-time 150"]
3211 "graceful-restart": "graceful-restart"
3218 result = verify_graceful_restart_timers(tgen, topo, 'ipv4', input_dict)
3222 errormsg(str) or True
3225 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3227 for router
, rnode
in tgen
.routers().iteritems():
3231 bgp_addr_type
= topo
["routers"][dut
]["bgp"]["address_family"]
3233 if addr_type
in bgp_addr_type
:
3234 if not check_address_types(addr_type
):
3237 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3239 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3240 if bgp_neighbor
!= peer
:
3243 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3244 data
= topo
["routers"][bgp_neighbor
]["links"]
3246 if dest_link
in data
:
3247 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3250 "[DUT: {}]: Checking bgp graceful-restart show"
3251 " o/p {}".format(dut
, neighbor_ip
)
3254 show_bgp_graceful_json
= run_frr_cmd(
3256 "show bgp {} neighbor {} graceful-restart json".format(
3257 addr_type
, neighbor_ip
3262 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3263 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3265 "[DUT: {}]: Neighbor ip matched {}".format(dut
, neighbor_ip
)
3268 errormsg
= "[DUT: {}]: Neighbor ip is NOT matched {}".format(
3273 # Graceful-restart timer
3274 if "graceful-restart" in input_dict
[peer
]["bgp"]:
3275 if "timer" in input_dict
[peer
]["bgp"]["graceful-restart"]:
3276 for rs_timer
, value
in input_dict
[peer
]["bgp"]["graceful-restart"][
3279 if rs_timer
== "restart-time":
3281 receivedTimer
= value
3283 show_bgp_graceful_json_out
["timers"][
3284 "receivedRestartTimer"
3289 "receivedRestartTimer is {}"
3290 " on {} from peer {}".format(
3291 receivedTimer
, router
, peer
3296 "receivedRestartTimer is not"
3297 " as expected {}".format(receivedTimer
)
3301 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3305 @retry(attempts
=4, wait
=2, return_is_str
=True, initial_wait
=2)
3306 def verify_gr_address_family(tgen
, topo
, addr_type
, addr_family
, dut
):
3308 This API is to verify gr_address_family in the BGP gr capability advertised
3309 by the neighbor router
3313 * `tgen`: topogen object
3314 * `topo`: input json file data
3315 * `addr_type` : ip type ipv4/ipv6
3316 * `addr_type` : ip type IPV4 Unicast/IPV6 Unicast
3317 * `dut`: input dut router name
3322 result = verify_gr_address_family(tgen, topo, "ipv4", "ipv4Unicast", "r1")
3326 errormsg(str) or True
3329 logger
.debug("Entering lib API: {}".format(sys
._getframe
().f_code
.co_name
))
3331 for router
, rnode
in tgen
.routers().iteritems():
3335 bgp_addr_type
= topo
["routers"][router
]["bgp"]["address_family"]
3337 if addr_type
in bgp_addr_type
:
3338 if not check_address_types(addr_type
):
3341 bgp_neighbors
= bgp_addr_type
[addr_type
]["unicast"]["neighbor"]
3343 for bgp_neighbor
, peer_data
in bgp_neighbors
.items():
3344 for dest_link
, peer_dict
in peer_data
["dest_link"].items():
3345 data
= topo
["routers"][bgp_neighbor
]["links"]
3347 if dest_link
in data
:
3348 neighbor_ip
= data
[dest_link
][addr_type
].split("/")[0]
3351 "[DUT: {}]: Checking bgp graceful-restart"
3352 " show o/p {}".format(dut
, neighbor_ip
)
3355 show_bgp_graceful_json
= run_frr_cmd(
3357 "show bgp {} neighbor {} graceful-restart json".format(
3358 addr_type
, neighbor_ip
3363 show_bgp_graceful_json_out
= show_bgp_graceful_json
[neighbor_ip
]
3365 if show_bgp_graceful_json_out
["neighborAddr"] == neighbor_ip
:
3366 logger
.info("Neighbor ip matched {}".format(neighbor_ip
))
3368 errormsg
= "Neighbor ip NOT a match {}".format(neighbor_ip
)
3371 if addr_family
== "ipv4Unicast":
3372 if "ipv4Unicast" in show_bgp_graceful_json_out
:
3373 logger
.info("ipv4Unicast present for {} ".format(neighbor_ip
))
3376 errormsg
= "ipv4Unicast NOT present for {} ".format(neighbor_ip
)
3379 elif addr_family
== "ipv6Unicast":
3380 if "ipv6Unicast" in show_bgp_graceful_json_out
:
3381 logger
.info("ipv6Unicast present for {} ".format(neighbor_ip
))
3384 errormsg
= "ipv6Unicast NOT present for {} ".format(neighbor_ip
)
3387 errormsg
= "Aaddress family: {} present for {} ".format(
3388 addr_family
, neighbor_ip
3392 logger
.debug("Exiting lib API: {}".format(sys
._getframe
().f_code
.co_name
))