2 # SPDX-License-Identifier: ISC
4 # Copyright (c) 2019 by VMware, Inc. ("VMware")
5 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc. ("NetDEF")
13 from time
import sleep
18 # Save the Current Working Directory to find configuration files.
19 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
20 sys
.path
.append(os
.path
.join("../"))
21 sys
.path
.append(os
.path
.join("../lib/"))
23 # pylint: disable=C0413
24 # Import topogen and topotest helpers
25 from lib
.topogen
import Topogen
, get_topogen
26 from lib
.topolog
import logger
28 # Required to instantiate the topology builder class.
30 # Import topoJson from lib, to create topology and initial configuration
31 from lib
.topojson
import build_config_from_json
35 verify_graceful_restart
,
40 verify_bgp_convergence
,
41 verify_gr_address_family
,
42 modify_bgp_config_when_bgpd_down
,
43 verify_graceful_restart_timers
,
44 verify_bgp_convergence_from_running_config
,
47 # Import common_config to use commomnly used APIs
48 from lib
.common_config
import (
49 create_common_configuration
,
54 find_interface_with_greater_ip
,
58 get_frr_ipv6_linklocal
,
61 from lib
.common_config
import (
63 reset_config_on_routers
,
72 get_frr_ipv6_linklocal
,
74 required_linux_kernel_version
,
77 pytestmark
= [pytest
.mark
.bgpd
]
81 BGP_CONVERGENCE
= False
83 GR_SELECT_DEFER_TIMER
= 5
84 GR_STALEPATH_TIMER
= 5
87 NETWORK1_1
= {"ipv4": "192.0.2.1/32", "ipv6": "2001:DB8::1:1/128"}
88 NETWORK1_2
= {"ipv4": "192.0.2.2/32", "ipv6": "2001:DB8::2:1/128"}
89 NETWORK2_1
= {"ipv4": "192.0.2.3/32", "ipv6": "2001:DB8::3:1/128"}
90 NETWORK2_2
= {"ipv4": "192.0.2.4/32", "ipv6": "2001:DB8::4:1/128"}
91 NETWORK3_1
= {"ipv4": "192.0.2.5/32", "ipv6": "2001:DB8::5:1/128"}
92 NETWORK3_2
= {"ipv4": "192.0.2.6/32", "ipv6": "2001:DB8::6:1/128"}
93 NETWORK4_1
= {"ipv4": "192.0.2.7/32", "ipv6": "2001:DB8::7:1/128"}
94 NETWORK4_2
= {"ipv4": "192.0.2.8/32", "ipv6": "2001:DB8::8:1/128"}
95 NETWORK5_1
= {"ipv4": "192.0.2.9/32", "ipv6": "2001:DB8::9:1/128"}
96 NETWORK5_2
= {"ipv4": "192.0.2.10/32", "ipv6": "2001:DB8::10:1/128"}
98 NEXT_HOP_IP
= {"ipv4": "Null0", "ipv6": "Null0"}
100 PREFERRED_NEXT_HOP
= "link_local"
103 def configure_gr_followed_by_clear(tgen
, topo
, input_dict
, tc_name
, dut
, peer
):
105 result = configure_gr_followed_by_clear(tgen, topo, dut)
106 assert result is True, \
107 "Testcase {} :Failed \n Error {}". \
108 format(tc_name, result)
111 result
= create_router_bgp(tgen
, topo
, input_dict
)
112 assert result
is True, "Testcase {} : Failed \n Error: {}".format(tc_name
, result
)
113 for addr_type
in ADDR_TYPES
:
114 clear_bgp(tgen
, addr_type
, dut
)
116 for addr_type
in ADDR_TYPES
:
117 clear_bgp(tgen
, addr_type
, peer
)
119 result
= verify_bgp_convergence_from_running_config(tgen
)
120 assert result
is True, "Testcase {} : Failed \n Error: {}".format(tc_name
, result
)
125 def verify_stale_routes_list(tgen
, addr_type
, dut
, input_dict
):
127 This API is use verify Stale routes on refering the network with next hop value
130 * `tgen`: topogen object
131 * `dut`: input dut router name
132 * `addr_type` : ip type ipv4/ipv6
133 * `input_dict` : input dict, has details of static routes
142 "network": [NETWORK1_1[addr_type]],
150 result = verify_stale_routes_list(tgen, addr_type, dut, input_dict)
153 errormsg(str) or True
155 logger
.debug("Entering lib API: verify_stale_routes_list()")
156 router_list
= tgen
.routers()
157 additional_nexthops_in_required_nhs
= []
161 for routerInput
in input_dict
.keys():
162 for router
, rnode
in router_list
.items():
165 # Verifying RIB routes
169 logger
.info("Checking router {} BGP RIB:".format(dut
))
170 if "static_routes" in input_dict
[routerInput
]:
171 static_routes
= input_dict
[routerInput
]["static_routes"]
172 for static_route
in static_routes
:
177 vrf
= static_route
.setdefault("vrf", None)
178 community
= static_route
.setdefault("community", None)
179 largeCommunity
= static_route
.setdefault("largeCommunity", None)
181 cmd
= "{} vrf {} {}".format(command
, vrf
, addr_type
)
183 cmd
= "{} community {}".format(cmd
, community
)
185 cmd
= "{} large-community {}".format(cmd
, largeCommunity
)
187 cmd
= "{} {}".format(command
, addr_type
)
188 cmd
= "{} json".format(cmd
)
189 rib_routes_json
= run_frr_cmd(rnode
, cmd
, isjson
=True)
190 # Verifying output dictionary rib_routes_json is not empty
191 if bool(rib_routes_json
) == False:
192 errormsg
= "[DUT: {}]: No route found in rib of router".format(
196 elif "warning" in rib_routes_json
:
197 errormsg
= "[DUT: {}]: {}".format(
198 router
, rib_routes_json
["warning"]
201 network
= static_route
["network"]
202 if "no_of_ip" in static_route
:
203 no_of_ip
= static_route
["no_of_ip"]
206 # Generating IPs for verification
207 ip_list
= generate_ips(network
, no_of_ip
)
209 for st_rt
in ip_list
:
210 st_rt
= str(ipaddress
.ip_network(st_rt
))
211 _addr_type
= validate_ip_address(st_rt
)
212 if _addr_type
!= addr_type
:
214 if st_rt
in rib_routes_json
["routes"]:
217 found_routes
.append(st_rt
)
218 for mnh
in range(0, len(rib_routes_json
["routes"][st_rt
])):
222 for rib_r
in rib_routes_json
["routes"][st_rt
][
229 return "error msg - no hops found"
232 def setup_module(mod
):
234 Sets up the pytest environment
239 # Required linux kernel version for this suite to run.
240 result
= required_linux_kernel_version("4.16")
241 if result
is not True:
242 pytest
.skip("Kernel requirements are not met, kernel version should be >=4.16")
246 testsuite_run_time
= time
.asctime(time
.localtime(time
.time()))
247 logger
.info("Testsuite start time: {}".format(testsuite_run_time
))
248 logger
.info("=" * 40)
250 logger
.info("Running setup_module to create topology")
252 # This function initiates the topology build with Topogen...
253 json_file
= "{}/bgp_gr_functionality_topo3.json".format(CWD
)
254 tgen
= Topogen(json_file
, mod
.__name
__)
256 topo
= tgen
.json_topo
257 # ... and here it calls Mininet initialization functions.
259 # Starting topology, create tmp files which are loaded to routers
260 # to start daemons and then start routers
263 # Creating configuration from JSON
264 build_config_from_json(tgen
, topo
)
266 # Api call verify whether BGP is converged
267 ADDR_TYPES
= check_address_types()
269 for addr_type
in ADDR_TYPES
:
270 BGP_CONVERGENCE
= verify_bgp_convergence(tgen
, topo
)
271 assert BGP_CONVERGENCE
is True, "setup_module : Failed \n Error:" " {}".format(
275 logger
.info("Running setup_module() done")
278 def teardown_module(mod
):
280 Teardown the pytest environment
285 logger
.info("Running teardown_module to delete topology")
289 # Stop toplogy and Remove tmp files
293 "Testsuite end time: {}".format(time
.asctime(time
.localtime(time
.time())))
295 logger
.info("=" * 40)
298 ################################################################################
302 ################################################################################
303 def test_bgp_gr_stale_routes(request
):
305 tc_name
= request
.node
.name
306 write_test_header(tc_name
)
308 step("Verify the router failures")
309 if tgen
.routers_have_failure():
310 check_router_status(tgen
)
312 step("Creating 5 static Routes in Router R3 with NULL0 as Next hop")
313 for addr_type
in ADDR_TYPES
:
318 "network": [NETWORK1_1
[addr_type
]] + [NETWORK1_2
[addr_type
]],
319 "next_hop": NEXT_HOP_IP
[addr_type
],
322 "network": [NETWORK2_1
[addr_type
]] + [NETWORK2_2
[addr_type
]],
323 "next_hop": NEXT_HOP_IP
[addr_type
],
326 "network": [NETWORK3_1
[addr_type
]] + [NETWORK3_2
[addr_type
]],
327 "next_hop": NEXT_HOP_IP
[addr_type
],
330 "network": [NETWORK4_1
[addr_type
]] + [NETWORK4_2
[addr_type
]],
331 "next_hop": NEXT_HOP_IP
[addr_type
],
334 "network": [NETWORK5_1
[addr_type
]] + [NETWORK5_2
[addr_type
]],
335 "next_hop": NEXT_HOP_IP
[addr_type
],
340 result
= create_static_routes(tgen
, input_dict_1
)
341 assert result
is True, "Testcase {} : Failed \n Error: {}".format(
344 step("verifying Created Route at R3 in VRF default")
345 for addr_type
in ADDR_TYPES
:
347 input_dict_1
= {"r3": topo
["routers"]["r3"]}
348 result
= verify_bgp_rib(tgen
, addr_type
, dut
, input_dict_1
)
349 assert result
is True, "Testcase {} :Failed \n Error {}".format(tc_name
, result
)
351 step("verifying Created Route at R2 in VRF default")
352 for addr_type
in ADDR_TYPES
:
354 input_dict_1
= {"r2": topo
["routers"]["r2"]}
355 result
= verify_bgp_rib(tgen
, addr_type
, dut
, input_dict_1
)
356 assert result
is True, "Testcase {} :Failed \n Error {}".format(tc_name
, result
)
357 step("importing vrf RED on R2 under Address Family")
358 for addr_type
in ADDR_TYPES
:
366 addr_type
: {"unicast": {"import": {"vrf": "default"}}}
372 result
= create_router_bgp(tgen
, topo
, input_import_vrf
)
373 assert result
is True, "Testcase {} : Failed \n Error: {}".format(
377 step("verifying static Routes at R2 in VRF RED")
378 for addr_type
in ADDR_TYPES
:
380 input_dict_1
= {"r2": topo
["routers"]["r2"]}
381 result
= verify_bgp_rib(tgen
, addr_type
, dut
, input_dict_1
)
382 assert result
is True, "Testcase {} :Failed \n Error {}".format(tc_name
, result
)
384 step("verifying static Routes at R1 in VRF RED")
385 for addr_type
in ADDR_TYPES
:
387 input_dict_1
= {"r1": topo
["routers"]["r1"]}
388 result
= verify_bgp_rib(tgen
, addr_type
, dut
, input_dict_1
)
389 assert result
is True, "Testcase {} :Failed \n Error {}".format(tc_name
, result
)
391 step("Configuring Graceful restart at R2 and R3 ")
396 "graceful-restart": {
397 "graceful-restart": True,
402 "bgp": {"local_as": "300", "graceful-restart": {"graceful-restart": True}}
406 configure_gr_followed_by_clear(tgen
, topo
, input_dict
, tc_name
, dut
="r2", peer
="r3")
408 step("verify Graceful restart at R2")
409 for addr_type
in ADDR_TYPES
:
410 result
= verify_graceful_restart(
411 tgen
, topo
, addr_type
, input_dict
, dut
="r2", peer
="r3"
413 assert result
is True, "Testcase {} :Failed \n Error {}".format(tc_name
, result
)
415 step("verify Graceful restart at R3")
416 for addr_type
in ADDR_TYPES
:
417 result
= verify_graceful_restart(
418 tgen
, topo
, addr_type
, input_dict
, dut
="r3", peer
="r2"
420 assert result
is True, "Testcase {} :Failed \n Error {}".format(tc_name
, result
)
422 step("Configuring Graceful-restart-disable at R3")
427 "graceful-restart": {
428 "graceful-restart": False,
433 "bgp": {"local_as": "300", "graceful-restart": {"graceful-restart": False}}
436 configure_gr_followed_by_clear(tgen
, topo
, input_dict
, tc_name
, dut
="r3", peer
="r2")
438 step("Verify Graceful-restart-disable at R3")
439 for addr_type
in ADDR_TYPES
:
440 result
= verify_graceful_restart(
441 tgen
, topo
, addr_type
, input_dict
, dut
="r3", peer
="r2"
443 assert result
is True, "Testcase {} :Failed \n Error {}".format(tc_name
, result
)
445 for iteration
in range(5):
446 step("graceful-restart-disable:True at R3")
450 "graceful-restart": {
451 "graceful-restart-disable": True,
456 configure_gr_followed_by_clear(
457 tgen
, topo
, input_dict
, tc_name
, dut
="r3", peer
="r2"
460 step("Verifying Routes at R2 on enabling GRD")
462 for addr_type
in ADDR_TYPES
:
463 input_dict_1
= {"r2": topo
["routers"]["r2"]}
464 result
= verify_bgp_rib(tgen
, addr_type
, dut
, input_dict_1
)
465 assert result
is True, "Testcase {} :Failed \n Error {}".format(
469 step("Verify stale Routes in Router R2 enabling GRD")
470 for addr_type
in ADDR_TYPES
:
473 verify_nh_for_static_rtes
= {
477 "network": [NETWORK1_1
[addr_type
]],
484 bgp_rib_next_hops
= verify_stale_routes_list(
485 tgen
, addr_type
, dut
, verify_nh_for_static_rtes
488 len(bgp_rib_next_hops
) == 1
489 ) is True, "Testcase {} : Failed \n Error: {}".format(
490 tc_name
, bgp_rib_next_hops
, expected
=True
493 step("graceful-restart-disable:False at R3")
497 "graceful-restart": {
498 "graceful-restart-disable": False,
503 configure_gr_followed_by_clear(
504 tgen
, topo
, input_dict
, tc_name
, dut
="r3", peer
="r2"
507 step("Verifying Routes at R2 on disabling GRD")
509 for addr_type
in ADDR_TYPES
:
510 input_dict_1
= {"r2": topo
["routers"]["r2"]}
511 result
= verify_bgp_rib(tgen
, addr_type
, dut
, input_dict_1
)
512 assert result
is True, "Testcase {} :Failed \n Error {}".format(
516 step("Verify stale Routes in Router R2 on disabling GRD")
517 for addr_type
in ADDR_TYPES
:
520 verify_nh_for_static_rtes
= {
524 "network": [NETWORK1_1
[addr_type
]],
531 bgp_rib_next_hops
= verify_stale_routes_list(
532 tgen
, addr_type
, dut
, verify_nh_for_static_rtes
535 stale_route_status
= len(bgp_rib_next_hops
) == 1
537 stale_route_status
is True
538 ), "Testcase {} : Failed \n Error: {}".format(
539 tc_name
, stale_route_status
, expected
=True
541 write_test_footer(tc_name
)