]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/bgp_ecmp_topo2/test_ibgp_ecmp_topo2.py
*: auto-convert to SPDX License IDs
[mirror_frr.git] / tests / topotests / bgp_ecmp_topo2 / test_ibgp_ecmp_topo2.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: ISC
3
4 #
5 # Copyright (c) 2019 by VMware, Inc. ("VMware")
6 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
7 # ("NetDEF") in this file.
8 #
9
10
11 """
12 Following tests are covered to test ecmp functionality on EBGP.
13 1. Verify routes installed as per maximum-paths configuration (8/16/32)
14 2. Disable/Shut selected paths nexthops and verify other next are installed in
15 the RIB of DUT. Enable interfaces and verify RIB count.
16 3. Verify BGP table and RIB in DUT after clear BGP routes and neighbors.
17 4. Verify routes are cleared from BGP and RIB table of DUT when
18 redistribute static configuration is removed.
19 5. Shut BGP neighbors one by one and verify BGP and routing table updated
20 accordingly in DUT
21 6. Delete static routes and verify routers are cleared from BGP table and RIB
22 of DUT.
23 7. Verify routes are cleared from BGP and RIB table of DUT when advertise
24 network configuration is removed.
25 """
26 import os
27 import sys
28 import time
29 import pytest
30
31 # Save the Current Working Directory to find configuration files.
32 CWD = os.path.dirname(os.path.realpath(__file__))
33 sys.path.append(os.path.join(CWD, "../"))
34 sys.path.append(os.path.join(CWD, "../../"))
35
36 # pylint: disable=C0413
37 # Import topogen and topotest helpers
38 from lib.topogen import Topogen, get_topogen
39
40 from lib.common_config import (
41 start_topology,
42 write_test_header,
43 write_test_footer,
44 verify_rib,
45 create_static_routes,
46 check_address_types,
47 interface_status,
48 reset_config_on_routers,
49 required_linux_kernel_version,
50 )
51 from lib.topolog import logger
52 from lib.bgp import verify_bgp_convergence, create_router_bgp, clear_bgp
53 from lib.topojson import build_config_from_json
54
55
56 pytestmark = [pytest.mark.bgpd, pytest.mark.staticd]
57
58
59 # Global variables
60 NEXT_HOPS = {"ipv4": [], "ipv6": []}
61 INTF_LIST_R3 = []
62 INTF_LIST_R2 = []
63 NETWORK = {"ipv4": "11.0.20.1/32", "ipv6": "1::/64"}
64 NEXT_HOP_IP = {"ipv4": "10.0.0.1", "ipv6": "fd00::1"}
65 BGP_CONVERGENCE = False
66
67
68 def setup_module(mod):
69 """
70 Sets up the pytest environment.
71
72 * `mod`: module name
73 """
74 global NEXT_HOPS, INTF_LIST_R3, INTF_LIST_R2, TEST_STATIC
75 global ADDR_TYPES
76
77 # Required linux kernel version for this suite to run.
78 result = required_linux_kernel_version("4.15")
79 if result is not True:
80 pytest.skip("Kernel requirements are not met, kernel version should be >=4.15")
81
82 testsuite_run_time = time.asctime(time.localtime(time.time()))
83 logger.info("Testsuite start time: {}".format(testsuite_run_time))
84 logger.info("=" * 40)
85
86 logger.info("Running setup_module to create topology")
87
88 # This function initiates the topology build with Topogen...
89 json_file = "{}/ibgp_ecmp_topo2.json".format(CWD)
90 tgen = Topogen(json_file, mod.__name__)
91 global topo
92 topo = tgen.json_topo
93
94 # Starting topology, create tmp files which are loaded to routers
95 # to start daemons and then start routers
96 start_topology(tgen)
97
98 # Creating configuration from JSON
99 build_config_from_json(tgen, topo)
100
101 # Don't run this test if we have any failure.
102 if tgen.routers_have_failure():
103 pytest.skip(tgen.errors)
104
105 # tgen.mininet_cli()
106 # Api call verify whether BGP is converged
107 ADDR_TYPES = check_address_types()
108
109 for addr_type in ADDR_TYPES:
110 BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
111 assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
112 BGP_CONVERGENCE
113 )
114
115 link_data = [
116 val for links, val in topo["routers"]["r2"]["links"].items() if "r3" in links
117 ]
118 for adt in ADDR_TYPES:
119 NEXT_HOPS[adt] = [val[adt].split("/")[0] for val in link_data]
120 if adt == "ipv4":
121 NEXT_HOPS[adt] = sorted(NEXT_HOPS[adt], key=lambda x: int(x.split(".")[2]))
122 elif adt == "ipv6":
123 NEXT_HOPS[adt] = sorted(
124 NEXT_HOPS[adt], key=lambda x: int(x.split(":")[-3], 16)
125 )
126
127 INTF_LIST_R2 = [val["interface"].split("/")[0] for val in link_data]
128 INTF_LIST_R2 = sorted(INTF_LIST_R2, key=lambda x: int(x.split("eth")[1]))
129
130 link_data = [
131 val for links, val in topo["routers"]["r3"]["links"].items() if "r2" in links
132 ]
133 INTF_LIST_R3 = [val["interface"].split("/")[0] for val in link_data]
134 INTF_LIST_R3 = sorted(INTF_LIST_R3, key=lambda x: int(x.split("eth")[1]))
135
136 # STATIC_ROUTE = True
137 logger.info("Running setup_module() done")
138
139
140 def teardown_module():
141 """
142 Teardown the pytest environment.
143
144 * `mod`: module name
145 """
146
147 logger.info("Running teardown_module to delete topology")
148
149 tgen = get_topogen()
150
151 # Stop toplogy and Remove tmp files
152 tgen.stop_topology()
153
154
155 def static_or_nw(tgen, topo, tc_name, test_type, dut):
156
157 if test_type == "redist_static":
158 input_dict_static = {
159 dut: {
160 "static_routes": [
161 {"network": NETWORK["ipv4"], "next_hop": NEXT_HOP_IP["ipv4"]},
162 {"network": NETWORK["ipv6"], "next_hop": NEXT_HOP_IP["ipv6"]},
163 ]
164 }
165 }
166 logger.info("Configuring static route on router %s", dut)
167 result = create_static_routes(tgen, input_dict_static)
168 assert result is True, "Testcase {} : Failed \n Error: {}".format(
169 tc_name, result
170 )
171
172 input_dict_2 = {
173 dut: {
174 "bgp": {
175 "address_family": {
176 "ipv4": {
177 "unicast": {"redistribute": [{"redist_type": "static"}]}
178 },
179 "ipv6": {
180 "unicast": {"redistribute": [{"redist_type": "static"}]}
181 },
182 }
183 }
184 }
185 }
186
187 logger.info("Configuring redistribute static route on router %s", dut)
188 result = create_router_bgp(tgen, topo, input_dict_2)
189 assert result is True, "Testcase {} : Failed \n Error: {}".format(
190 tc_name, result
191 )
192
193 elif test_type == "advertise_nw":
194 input_dict_nw = {
195 dut: {
196 "bgp": {
197 "address_family": {
198 "ipv4": {
199 "unicast": {
200 "advertise_networks": [{"network": NETWORK["ipv4"]}]
201 }
202 },
203 "ipv6": {
204 "unicast": {
205 "advertise_networks": [{"network": NETWORK["ipv6"]}]
206 }
207 },
208 }
209 }
210 }
211 }
212
213 logger.info(
214 "Advertising networks %s %s from router %s",
215 NETWORK["ipv4"],
216 NETWORK["ipv6"],
217 dut,
218 )
219 result = create_router_bgp(tgen, topo, input_dict_nw)
220 assert result is True, "Testcase {} : Failed \n Error: {}".format(
221 tc_name, result
222 )
223
224
225 @pytest.mark.parametrize("ecmp_num", ["8", "16", "32"])
226 @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
227 def test_modify_ecmp_max_paths(request, ecmp_num, test_type):
228 """
229 Verify routes installed as per maximum-paths
230 configuration (8/16/32).
231 """
232
233 tc_name = request.node.name
234 write_test_header(tc_name)
235 tgen = get_topogen()
236
237 reset_config_on_routers(tgen)
238
239 static_or_nw(tgen, topo, tc_name, test_type, "r2")
240
241 input_dict = {
242 "r3": {
243 "bgp": {
244 "address_family": {
245 "ipv4": {
246 "unicast": {
247 "maximum_paths": {
248 "ibgp": ecmp_num,
249 }
250 }
251 },
252 "ipv6": {
253 "unicast": {
254 "maximum_paths": {
255 "ibgp": ecmp_num,
256 }
257 }
258 },
259 }
260 }
261 }
262 }
263
264 logger.info("Configuring bgp maximum-paths %s on router r3", ecmp_num)
265 result = create_router_bgp(tgen, topo, input_dict)
266 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
267
268 # Verifying RIB routes
269 dut = "r3"
270 protocol = "bgp"
271
272 for addr_type in ADDR_TYPES:
273 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
274
275 logger.info("Verifying %s routes on r3", addr_type)
276
277 # Test only the count of nexthops, not the specific nexthop addresses -
278 # they're not deterministic
279 #
280 result = verify_rib(
281 tgen,
282 addr_type,
283 dut,
284 input_dict_1,
285 next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
286 protocol=protocol,
287 count_only=True,
288 )
289
290 assert result is True, "Testcase {} : Failed \n Error: {}".format(
291 tc_name, result
292 )
293
294 write_test_footer(tc_name)
295
296
297 @pytest.mark.parametrize("ecmp_num", ["8", "16", "32"])
298 @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
299 def test_ecmp_after_clear_bgp(request, ecmp_num, test_type):
300 """Verify BGP table and RIB in DUT after clear BGP routes and neighbors"""
301
302 tc_name = request.node.name
303 write_test_header(tc_name)
304 tgen = get_topogen()
305
306 reset_config_on_routers(tgen)
307
308 # Verifying RIB routes
309 dut = "r3"
310 protocol = "bgp"
311
312 static_or_nw(tgen, topo, tc_name, test_type, "r2")
313 for addr_type in ADDR_TYPES:
314 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
315
316 logger.info("Verifying %s routes on r3", addr_type)
317 result = verify_rib(
318 tgen,
319 addr_type,
320 dut,
321 input_dict_1,
322 next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
323 protocol=protocol,
324 )
325 assert result is True, "Testcase {} : Failed \n Error: {}".format(
326 tc_name, result
327 )
328
329 # Clear BGP
330 for addr_type in ADDR_TYPES:
331 clear_bgp(tgen, addr_type, dut)
332
333 # Verify BGP convergence
334 result = verify_bgp_convergence(tgen, topo)
335 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
336
337 for addr_type in ADDR_TYPES:
338 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
339 logger.info("Verifying %s routes on r3", addr_type)
340 result = verify_rib(
341 tgen,
342 addr_type,
343 dut,
344 input_dict_1,
345 next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
346 protocol=protocol,
347 )
348 assert result is True, "Testcase {} : Failed \n Error: {}".format(
349 tc_name, result
350 )
351
352 write_test_footer(tc_name)
353
354
355 def test_ecmp_remove_redistribute_static(request):
356 """Verify routes are cleared from BGP and RIB table of DUT when
357 redistribute static configuration is removed."""
358
359 tc_name = request.node.name
360 write_test_header(tc_name)
361 tgen = get_topogen()
362
363 reset_config_on_routers(tgen)
364 static_or_nw(tgen, topo, tc_name, "redist_static", "r2")
365 for addr_type in ADDR_TYPES:
366
367 # Verifying RIB routes
368 dut = "r3"
369 protocol = "bgp"
370 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
371
372 logger.info("Verifying %s routes on r3", addr_type)
373 result = verify_rib(
374 tgen,
375 addr_type,
376 dut,
377 input_dict_1,
378 next_hop=NEXT_HOPS[addr_type],
379 protocol=protocol,
380 )
381 assert result is True, "Testcase {} : Failed \n Error: {}".format(
382 tc_name, result
383 )
384
385 input_dict_2 = {
386 "r2": {
387 "bgp": {
388 "address_family": {
389 "ipv4": {
390 "unicast": {
391 "redistribute": [{"redist_type": "static", "delete": True}]
392 }
393 },
394 "ipv6": {
395 "unicast": {
396 "redistribute": [{"redist_type": "static", "delete": True}]
397 }
398 },
399 }
400 }
401 }
402 }
403
404 logger.info("Remove redistribute static")
405 result = create_router_bgp(tgen, topo, input_dict_2)
406 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
407
408 for addr_type in ADDR_TYPES:
409
410 # Verifying RIB routes
411 dut = "r3"
412 protocol = "bgp"
413 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
414
415 logger.info("Verifying %s routes on r3 are deleted", addr_type)
416 result = verify_rib(
417 tgen,
418 addr_type,
419 dut,
420 input_dict_1,
421 next_hop=[],
422 protocol=protocol,
423 expected=False,
424 )
425 assert (
426 result is not True
427 ), "Testcase {} : Failed \n Expected: Routes still present in {} RIB. Found: {}".format(
428 tc_name, dut, result
429 )
430
431 logger.info("Enable redistribute static")
432 input_dict_2 = {
433 "r2": {
434 "bgp": {
435 "address_family": {
436 "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
437 "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
438 }
439 }
440 }
441 }
442 result = create_router_bgp(tgen, topo, input_dict_2)
443 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
444
445 for addr_type in ADDR_TYPES:
446 # Verifying RIB routes
447 dut = "r3"
448 protocol = "bgp"
449 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
450 logger.info("Verifying %s routes on r3", addr_type)
451 result = verify_rib(
452 tgen,
453 addr_type,
454 dut,
455 input_dict_1,
456 next_hop=NEXT_HOPS[addr_type],
457 protocol=protocol,
458 )
459 assert result is True, "Testcase {} : Failed \n Error: {}".format(
460 tc_name, result
461 )
462
463 write_test_footer(tc_name)
464
465
466 @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
467 def test_ecmp_shut_bgp_neighbor(request, test_type):
468 """Shut BGP neighbors one by one and verify BGP and routing table updated
469 accordingly in DUT"""
470
471 tc_name = request.node.name
472 write_test_header(tc_name)
473 tgen = get_topogen()
474
475 logger.info(INTF_LIST_R2)
476 # Verifying RIB routes
477 dut = "r3"
478 protocol = "bgp"
479
480 reset_config_on_routers(tgen)
481 static_or_nw(tgen, topo, tc_name, test_type, "r2")
482
483 for addr_type in ADDR_TYPES:
484 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
485
486 logger.info("Verifying %s routes on r3", addr_type)
487 result = verify_rib(
488 tgen,
489 addr_type,
490 dut,
491 input_dict,
492 next_hop=NEXT_HOPS[addr_type],
493 protocol=protocol,
494 )
495 assert result is True, "Testcase {} : Failed \n Error: {}".format(
496 tc_name, result
497 )
498
499 for intf_num in range(len(INTF_LIST_R2) + 1, 16):
500 intf_val = INTF_LIST_R2[intf_num : intf_num + 16]
501
502 input_dict_1 = {"r2": {"interface_list": [intf_val], "status": "down"}}
503 logger.info("Shutting down neighbor interface {} on r2".format(intf_val))
504 result = interface_status(tgen, topo, input_dict_1)
505 assert result is True, "Testcase {} : Failed \n Error: {}".format(
506 tc_name, result
507 )
508
509 for addr_type in ADDR_TYPES:
510 if intf_num + 16 < 32:
511 check_hops = NEXT_HOPS[addr_type]
512 else:
513 check_hops = []
514
515 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
516 logger.info("Verifying %s routes on r3", addr_type)
517 result = verify_rib(
518 tgen, addr_type, dut, input_dict, next_hop=check_hops, protocol=protocol
519 )
520 assert result is True, "Testcase {} : Failed \n Error: {}".format(
521 tc_name, result
522 )
523
524 input_dict_1 = {"r2": {"interface_list": INTF_LIST_R2, "status": "up"}}
525
526 logger.info("Enabling all neighbor interface {} on r2")
527 result = interface_status(tgen, topo, input_dict_1)
528 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
529
530 static_or_nw(tgen, topo, tc_name, test_type, "r2")
531 for addr_type in ADDR_TYPES:
532 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
533
534 logger.info("Verifying %s routes on r3", addr_type)
535 result = verify_rib(
536 tgen,
537 addr_type,
538 dut,
539 input_dict,
540 next_hop=NEXT_HOPS[addr_type],
541 protocol=protocol,
542 )
543 assert result is True, "Testcase {} : Failed \n Error: {}".format(
544 tc_name, result
545 )
546
547 write_test_footer(tc_name)
548
549
550 def test_ecmp_remove_static_route(request):
551 """
552 Delete static routes and verify routers are cleared from BGP table,
553 and RIB of DUT.
554 """
555
556 tc_name = request.node.name
557 write_test_header(tc_name)
558 tgen = get_topogen()
559
560 # Verifying RIB routes
561 dut = "r3"
562 protocol = "bgp"
563
564 reset_config_on_routers(tgen)
565
566 static_or_nw(tgen, topo, tc_name, "redist_static", "r2")
567 for addr_type in ADDR_TYPES:
568 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
569
570 logger.info("Verifying %s routes on r3", addr_type)
571 result = verify_rib(
572 tgen,
573 addr_type,
574 dut,
575 input_dict_1,
576 next_hop=NEXT_HOPS[addr_type],
577 protocol=protocol,
578 )
579 assert result is True, "Testcase {} : Failed \n Error: {}".format(
580 tc_name, result
581 )
582
583 for addr_type in ADDR_TYPES:
584 input_dict_2 = {
585 "r2": {
586 "static_routes": [
587 {
588 "network": NETWORK[addr_type],
589 "next_hop": NEXT_HOP_IP[addr_type],
590 "delete": True,
591 }
592 ]
593 }
594 }
595
596 logger.info("Remove static routes")
597 result = create_static_routes(tgen, input_dict_2)
598 assert result is True, "Testcase {} : Failed \n Error: {}".format(
599 tc_name, result
600 )
601
602 logger.info("Verifying %s routes on r3 are removed", addr_type)
603 result = verify_rib(
604 tgen,
605 addr_type,
606 dut,
607 input_dict_2,
608 next_hop=[],
609 protocol=protocol,
610 expected=False,
611 )
612 assert (
613 result is not True
614 ), "Testcase {} : Failed \n Expected: Routes still present in {} RIB. Found: {}".format(
615 tc_name, dut, result
616 )
617
618 for addr_type in ADDR_TYPES:
619 # Enable static routes
620 input_dict_4 = {
621 "r2": {
622 "static_routes": [
623 {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
624 ]
625 }
626 }
627
628 logger.info("Enable static route")
629 result = create_static_routes(tgen, input_dict_4)
630 assert result is True, "Testcase {} : Failed \n Error: {}".format(
631 tc_name, result
632 )
633
634 logger.info("Verifying %s routes on r3", addr_type)
635 result = verify_rib(
636 tgen,
637 addr_type,
638 dut,
639 input_dict_4,
640 next_hop=NEXT_HOPS[addr_type],
641 protocol=protocol,
642 )
643 assert result is True, "Testcase {} : Failed \n Error: {}".format(
644 tc_name, result
645 )
646
647 write_test_footer(tc_name)
648
649
650 def test_ecmp_remove_nw_advertise(request):
651 """
652 Verify routes are cleared from BGP and RIB table of DUT,
653 when advertise network configuration is removed
654 """
655
656 tc_name = request.node.name
657 write_test_header(tc_name)
658 tgen = get_topogen()
659
660 # Verifying RIB routes
661 dut = "r3"
662 protocol = "bgp"
663
664 reset_config_on_routers(tgen)
665 static_or_nw(tgen, topo, tc_name, "advertise_nw", "r2")
666 for addr_type in ADDR_TYPES:
667 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
668
669 logger.info("Verifying %s routes on r3", addr_type)
670 result = verify_rib(
671 tgen,
672 addr_type,
673 dut,
674 input_dict,
675 next_hop=NEXT_HOPS[addr_type],
676 protocol=protocol,
677 )
678 assert result is True, "Testcase {} : Failed \n Error: {}".format(
679 tc_name, result
680 )
681
682 input_dict_3 = {
683 "r2": {
684 "bgp": {
685 "address_family": {
686 "ipv4": {
687 "unicast": {
688 "advertise_networks": [
689 {"network": NETWORK["ipv4"], "delete": True}
690 ]
691 }
692 },
693 "ipv6": {
694 "unicast": {
695 "advertise_networks": [
696 {"network": NETWORK["ipv6"], "delete": True}
697 ]
698 }
699 },
700 }
701 }
702 }
703 }
704
705 logger.info("Withdraw advertised networks")
706 result = create_router_bgp(tgen, topo, input_dict_3)
707 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
708
709 for addr_type in ADDR_TYPES:
710 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
711
712 logger.info("Verifying %s routes on r3", addr_type)
713 result = verify_rib(
714 tgen,
715 addr_type,
716 dut,
717 input_dict,
718 next_hop=[],
719 protocol=protocol,
720 expected=False,
721 )
722 assert (
723 result is not True
724 ), "Testcase {} : Failed \n Expected: Routes still present in {} RIB. Found: {}".format(
725 tc_name, dut, result
726 )
727
728 static_or_nw(tgen, topo, tc_name, "advertise_nw", "r2")
729 for addr_type in ADDR_TYPES:
730 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
731 logger.info("Verifying %s routes on r3", addr_type)
732 result = verify_rib(
733 tgen,
734 addr_type,
735 dut,
736 input_dict,
737 next_hop=NEXT_HOPS[addr_type],
738 protocol=protocol,
739 )
740 assert result is True, "Testcase {} : Failed \n Error: {}".format(
741 tc_name, result
742 )
743
744
745 if __name__ == "__main__":
746 args = ["-s"] + sys.argv[1:]
747 sys.exit(pytest.main(args))