]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/bgp_ecmp_topo2/test_ebgp_ecmp_topo2.py
Merge pull request #12818 from imzyxwvu/fix/other-table-inactive
[mirror_frr.git] / tests / topotests / bgp_ecmp_topo2 / test_ebgp_ecmp_topo2.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: ISC
3
4 #
5 # Copyright (c) 2019 by VMware, Inc. ("VMware")
6 # Used Copyright (c) 2018 by Network Device Education Foundation, Inc.
7 # ("NetDEF") in this file.
8 #
9
10
11 """
12 Following tests are covered to test ecmp functionality on EBGP.
13 1. Verify routes installed as per maximum-paths configuration (8/16/32)
14 2. Disable/Shut selected paths nexthops and verify other next are installed in
15 the RIB of DUT. Enable interfaces and verify RIB count.
16 3. Verify BGP table and RIB in DUT after clear BGP routes and neighbors.
17 4. Verify routes are cleared from BGP and RIB table of DUT when
18 redistribute static configuration is removed.
19 5. Shut BGP neighbors one by one and verify BGP and routing table updated
20 accordingly in DUT
21 6. Delete static routes and verify routers are cleared from BGP table and RIB
22 of DUT.
23 7. Verify routes are cleared from BGP and RIB table of DUT when advertise
24 network configuration is removed.
25 """
26 import os
27 import sys
28 import time
29 import pytest
30
31 # Save the Current Working Directory to find configuration files.
32 CWD = os.path.dirname(os.path.realpath(__file__))
33 sys.path.append(os.path.join(CWD, "../"))
34 sys.path.append(os.path.join(CWD, "../../"))
35
36 # pylint: disable=C0413
37 # Import topogen and topotest helpers
38 from lib.topogen import Topogen, get_topogen
39
40 from lib.common_config import (
41 start_topology,
42 write_test_header,
43 write_test_footer,
44 verify_rib,
45 create_static_routes,
46 check_address_types,
47 interface_status,
48 reset_config_on_routers,
49 required_linux_kernel_version,
50 )
51 from lib.topolog import logger
52 from lib.bgp import verify_bgp_convergence, create_router_bgp, clear_bgp
53 from lib.topojson import build_config_from_json
54
55
56 pytestmark = [pytest.mark.bgpd, pytest.mark.staticd]
57
58
59 # Global variables
60 NEXT_HOPS = {"ipv4": [], "ipv6": []}
61 INTF_LIST_R3 = []
62 INTF_LIST_R2 = []
63 NETWORK = {"ipv4": "11.0.20.1/32", "ipv6": "1::/64"}
64 NEXT_HOP_IP = {"ipv4": "10.0.0.1", "ipv6": "fd00::1"}
65 BGP_CONVERGENCE = False
66
67
68 def setup_module(mod):
69 """
70 Sets up the pytest environment.
71
72 * `mod`: module name
73 """
74 global NEXT_HOPS, INTF_LIST_R3, INTF_LIST_R2, TEST_STATIC
75 global ADDR_TYPES
76
77 # Required linux kernel version for this suite to run.
78 result = required_linux_kernel_version("4.15")
79 if result is not True:
80 pytest.skip("Kernel requirements are not met, kernel version should be >=4.15")
81
82 testsuite_run_time = time.asctime(time.localtime(time.time()))
83 logger.info("Testsuite start time: {}".format(testsuite_run_time))
84 logger.info("=" * 40)
85
86 logger.info("Running setup_module to create topology")
87
88 # This function initiates the topology build with Topogen...
89 json_file = "{}/ebgp_ecmp_topo2.json".format(CWD)
90 tgen = Topogen(json_file, mod.__name__)
91 global topo
92 topo = tgen.json_topo
93
94 # Starting topology, create tmp files which are loaded to routers
95 # to start daemons and then start routers
96 start_topology(tgen)
97
98 # Creating configuration from JSON
99 build_config_from_json(tgen, topo)
100
101 # Don't run this test if we have any failure.
102 if tgen.routers_have_failure():
103 pytest.skip(tgen.errors)
104
105 # tgen.mininet_cli()
106 # Api call verify whether BGP is converged
107 ADDR_TYPES = check_address_types()
108
109 BGP_CONVERGENCE = verify_bgp_convergence(tgen, topo)
110 assert BGP_CONVERGENCE is True, "setup_module :Failed \n Error:" " {}".format(
111 BGP_CONVERGENCE
112 )
113
114 link_data = [
115 val for links, val in topo["routers"]["r2"]["links"].items() if "r3" in links
116 ]
117 for adt in ADDR_TYPES:
118 NEXT_HOPS[adt] = [val[adt].split("/")[0] for val in link_data]
119 if adt == "ipv4":
120 NEXT_HOPS[adt] = sorted(NEXT_HOPS[adt], key=lambda x: int(x.split(".")[2]))
121 elif adt == "ipv6":
122 NEXT_HOPS[adt] = sorted(
123 NEXT_HOPS[adt], key=lambda x: int(x.split(":")[-3], 16)
124 )
125
126 INTF_LIST_R2 = [val["interface"].split("/")[0] for val in link_data]
127 INTF_LIST_R2 = sorted(INTF_LIST_R2, key=lambda x: int(x.split("eth")[1]))
128
129 link_data = [
130 val for links, val in topo["routers"]["r3"]["links"].items() if "r2" in links
131 ]
132 INTF_LIST_R3 = [val["interface"].split("/")[0] for val in link_data]
133 INTF_LIST_R3 = sorted(INTF_LIST_R3, key=lambda x: int(x.split("eth")[1]))
134
135 # STATIC_ROUTE = True
136 logger.info("Running setup_module() done")
137
138
139 def teardown_module():
140 """
141 Teardown the pytest environment.
142
143 * `mod`: module name
144 """
145
146 logger.info("Running teardown_module to delete topology")
147
148 tgen = get_topogen()
149
150 # Stop toplogy and Remove tmp files
151 tgen.stop_topology()
152
153
154 def static_or_nw(tgen, topo, tc_name, test_type, dut):
155
156 if test_type == "redist_static":
157 input_dict_static = {
158 dut: {
159 "static_routes": [
160 {"network": NETWORK["ipv4"], "next_hop": NEXT_HOP_IP["ipv4"]},
161 {"network": NETWORK["ipv6"], "next_hop": NEXT_HOP_IP["ipv6"]},
162 ]
163 }
164 }
165 logger.info("Configuring static route on router %s", dut)
166 result = create_static_routes(tgen, input_dict_static)
167 assert result is True, "Testcase {} : Failed \n Error: {}".format(
168 tc_name, result
169 )
170
171 input_dict_2 = {
172 dut: {
173 "bgp": {
174 "address_family": {
175 "ipv4": {
176 "unicast": {"redistribute": [{"redist_type": "static"}]}
177 },
178 "ipv6": {
179 "unicast": {"redistribute": [{"redist_type": "static"}]}
180 },
181 }
182 }
183 }
184 }
185
186 logger.info("Configuring redistribute static route on router %s", dut)
187 result = create_router_bgp(tgen, topo, input_dict_2)
188 assert result is True, "Testcase {} : Failed \n Error: {}".format(
189 tc_name, result
190 )
191
192 elif test_type == "advertise_nw":
193 input_dict_nw = {
194 dut: {
195 "bgp": {
196 "address_family": {
197 "ipv4": {
198 "unicast": {
199 "advertise_networks": [{"network": NETWORK["ipv4"]}]
200 }
201 },
202 "ipv6": {
203 "unicast": {
204 "advertise_networks": [{"network": NETWORK["ipv6"]}]
205 }
206 },
207 }
208 }
209 }
210 }
211
212 logger.info(
213 "Advertising networks %s %s from router %s",
214 NETWORK["ipv4"],
215 NETWORK["ipv6"],
216 dut,
217 )
218 result = create_router_bgp(tgen, topo, input_dict_nw)
219 assert result is True, "Testcase {} : Failed \n Error: {}".format(
220 tc_name, result
221 )
222
223
224 @pytest.mark.parametrize("ecmp_num", ["8", "16", "32"])
225 @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
226 def test_modify_ecmp_max_paths(request, ecmp_num, test_type):
227 """
228 Verify routes installed as per maximum-paths
229 configuration (8/16/32).
230 """
231
232 tc_name = request.node.name
233 write_test_header(tc_name)
234 tgen = get_topogen()
235
236 reset_config_on_routers(tgen)
237
238 static_or_nw(tgen, topo, tc_name, test_type, "r2")
239
240 input_dict = {
241 "r3": {
242 "bgp": {
243 "address_family": {
244 "ipv4": {
245 "unicast": {
246 "maximum_paths": {
247 "ebgp": ecmp_num,
248 }
249 }
250 },
251 "ipv6": {
252 "unicast": {
253 "maximum_paths": {
254 "ebgp": ecmp_num,
255 }
256 }
257 },
258 }
259 }
260 }
261 }
262
263 logger.info("Configuring bgp maximum-paths %s on router r3", ecmp_num)
264 result = create_router_bgp(tgen, topo, input_dict)
265 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
266
267 # Verifying RIB routes
268 dut = "r3"
269 protocol = "bgp"
270
271 for addr_type in ADDR_TYPES:
272 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
273
274 logger.info("Verifying %s routes on r3", addr_type)
275
276 # Only test the count of nexthops; the actual nexthop addresses
277 # can vary and are not deterministic.
278 #
279 result = verify_rib(
280 tgen,
281 addr_type,
282 dut,
283 input_dict_1,
284 next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
285 protocol=protocol,
286 count_only=True,
287 )
288
289 assert result is True, "Testcase {} : Failed \n Error: {}".format(
290 tc_name, result
291 )
292
293 write_test_footer(tc_name)
294
295
296 @pytest.mark.parametrize("ecmp_num", ["8", "16", "32"])
297 @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
298 def test_ecmp_after_clear_bgp(request, ecmp_num, test_type):
299 """Verify BGP table and RIB in DUT after clear BGP routes and neighbors"""
300
301 tc_name = request.node.name
302 write_test_header(tc_name)
303 tgen = get_topogen()
304
305 reset_config_on_routers(tgen)
306
307 # Verifying RIB routes
308 dut = "r3"
309 protocol = "bgp"
310
311 static_or_nw(tgen, topo, tc_name, test_type, "r2")
312 for addr_type in ADDR_TYPES:
313 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
314
315 logger.info("Verifying %s routes on r3", addr_type)
316 result = verify_rib(
317 tgen,
318 addr_type,
319 dut,
320 input_dict_1,
321 next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
322 protocol=protocol,
323 )
324 assert result is True, "Testcase {} : Failed \n Error: {}".format(
325 tc_name, result
326 )
327
328 # Clear BGP
329 for addr_type in ADDR_TYPES:
330 clear_bgp(tgen, addr_type, dut)
331
332 # Verify BGP convergence
333 result = verify_bgp_convergence(tgen, topo)
334 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
335
336 for addr_type in ADDR_TYPES:
337 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
338 logger.info("Verifying %s routes on r3", addr_type)
339 result = verify_rib(
340 tgen,
341 addr_type,
342 dut,
343 input_dict_1,
344 next_hop=NEXT_HOPS[addr_type][: int(ecmp_num)],
345 protocol=protocol,
346 )
347 assert result is True, "Testcase {} : Failed \n Error: {}".format(
348 tc_name, result
349 )
350
351 write_test_footer(tc_name)
352
353
354 def test_ecmp_remove_redistribute_static(request):
355 """Verify routes are cleared from BGP and RIB table of DUT when
356 redistribute static configuration is removed."""
357
358 tc_name = request.node.name
359 write_test_header(tc_name)
360 tgen = get_topogen()
361
362 reset_config_on_routers(tgen)
363 static_or_nw(tgen, topo, tc_name, "redist_static", "r2")
364 for addr_type in ADDR_TYPES:
365
366 # Verifying RIB routes
367 dut = "r3"
368 protocol = "bgp"
369 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
370
371 logger.info("Verifying %s routes on r3", addr_type)
372 result = verify_rib(
373 tgen,
374 addr_type,
375 dut,
376 input_dict_1,
377 next_hop=NEXT_HOPS[addr_type],
378 protocol=protocol,
379 )
380 assert result is True, "Testcase {} : Failed \n Error: {}".format(
381 tc_name, result
382 )
383
384 input_dict_2 = {
385 "r2": {
386 "bgp": {
387 "address_family": {
388 "ipv4": {
389 "unicast": {
390 "redistribute": [{"redist_type": "static", "delete": True}]
391 }
392 },
393 "ipv6": {
394 "unicast": {
395 "redistribute": [{"redist_type": "static", "delete": True}]
396 }
397 },
398 }
399 }
400 }
401 }
402
403 logger.info("Remove redistribute static")
404 result = create_router_bgp(tgen, topo, input_dict_2)
405 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
406
407 for addr_type in ADDR_TYPES:
408
409 # Verifying RIB routes
410 dut = "r3"
411 protocol = "bgp"
412 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
413
414 logger.info("Verifying %s routes on r3 are deleted", addr_type)
415 result = verify_rib(
416 tgen,
417 addr_type,
418 dut,
419 input_dict_1,
420 next_hop=[],
421 protocol=protocol,
422 expected=False,
423 )
424 assert (
425 result is not True
426 ), "Testcase {} : Failed \n Expected: Routes still present in {} RIB. Found: {}".format(
427 tc_name, dut, result
428 )
429
430 logger.info("Enable redistribute static")
431 input_dict_2 = {
432 "r2": {
433 "bgp": {
434 "address_family": {
435 "ipv4": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
436 "ipv6": {"unicast": {"redistribute": [{"redist_type": "static"}]}},
437 }
438 }
439 }
440 }
441 result = create_router_bgp(tgen, topo, input_dict_2)
442 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
443
444 for addr_type in ADDR_TYPES:
445 # Verifying RIB routes
446 dut = "r3"
447 protocol = "bgp"
448 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
449 logger.info("Verifying %s routes on r3", addr_type)
450 result = verify_rib(
451 tgen,
452 addr_type,
453 dut,
454 input_dict_1,
455 next_hop=NEXT_HOPS[addr_type],
456 protocol=protocol,
457 )
458 assert result is True, "Testcase {} : Failed \n Error: {}".format(
459 tc_name, result
460 )
461
462 write_test_footer(tc_name)
463
464
465 @pytest.mark.parametrize("test_type", ["redist_static", "advertise_nw"])
466 def test_ecmp_shut_bgp_neighbor(request, test_type):
467 """Shut BGP neighbors one by one and verify BGP and routing table updated
468 accordingly in DUT"""
469
470 tc_name = request.node.name
471 write_test_header(tc_name)
472 tgen = get_topogen()
473
474 logger.info(INTF_LIST_R2)
475 # Verifying RIB routes
476 dut = "r3"
477 protocol = "bgp"
478
479 reset_config_on_routers(tgen)
480 static_or_nw(tgen, topo, tc_name, test_type, "r2")
481
482 for addr_type in ADDR_TYPES:
483 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
484
485 logger.info("Verifying %s routes on r3", addr_type)
486 result = verify_rib(
487 tgen,
488 addr_type,
489 dut,
490 input_dict,
491 next_hop=NEXT_HOPS[addr_type],
492 protocol=protocol,
493 )
494 assert result is True, "Testcase {} : Failed \n Error: {}".format(
495 tc_name, result
496 )
497
498 for intf_num in range(len(INTF_LIST_R2) + 1, 16):
499 intf_val = INTF_LIST_R2[intf_num : intf_num + 16]
500
501 input_dict_1 = {"r2": {"interface_list": [intf_val], "status": "down"}}
502 logger.info("Shutting down neighbor interface {} on r2".format(intf_val))
503 result = interface_status(tgen, topo, input_dict_1)
504 assert result is True, "Testcase {} : Failed \n Error: {}".format(
505 tc_name, result
506 )
507
508 for addr_type in ADDR_TYPES:
509 if intf_num + 16 < 32:
510 check_hops = NEXT_HOPS[addr_type]
511 else:
512 check_hops = []
513
514 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
515 logger.info("Verifying %s routes on r3", addr_type)
516 result = verify_rib(
517 tgen, addr_type, dut, input_dict, next_hop=check_hops, protocol=protocol
518 )
519 assert result is True, "Testcase {} : Failed \n Error: {}".format(
520 tc_name, result
521 )
522
523 input_dict_1 = {"r2": {"interface_list": INTF_LIST_R2, "status": "up"}}
524
525 logger.info("Enabling all neighbor interface {} on r2")
526 result = interface_status(tgen, topo, input_dict_1)
527 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
528
529 static_or_nw(tgen, topo, tc_name, test_type, "r2")
530 for addr_type in ADDR_TYPES:
531 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
532
533 logger.info("Verifying %s routes on r3", addr_type)
534 result = verify_rib(
535 tgen,
536 addr_type,
537 dut,
538 input_dict,
539 next_hop=NEXT_HOPS[addr_type],
540 protocol=protocol,
541 )
542 assert result is True, "Testcase {} : Failed \n Error: {}".format(
543 tc_name, result
544 )
545
546 write_test_footer(tc_name)
547
548
549 def test_ecmp_remove_static_route(request):
550 """
551 Delete static routes and verify routers are cleared from BGP table,
552 and RIB of DUT.
553 """
554
555 tc_name = request.node.name
556 write_test_header(tc_name)
557 tgen = get_topogen()
558
559 # Verifying RIB routes
560 dut = "r3"
561 protocol = "bgp"
562
563 reset_config_on_routers(tgen)
564
565 static_or_nw(tgen, topo, tc_name, "redist_static", "r2")
566 for addr_type in ADDR_TYPES:
567 input_dict_1 = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
568
569 logger.info("Verifying %s routes on r3", addr_type)
570 result = verify_rib(
571 tgen,
572 addr_type,
573 dut,
574 input_dict_1,
575 next_hop=NEXT_HOPS[addr_type],
576 protocol=protocol,
577 )
578 assert result is True, "Testcase {} : Failed \n Error: {}".format(
579 tc_name, result
580 )
581
582 for addr_type in ADDR_TYPES:
583 input_dict_2 = {
584 "r2": {
585 "static_routes": [
586 {
587 "network": NETWORK[addr_type],
588 "next_hop": NEXT_HOP_IP[addr_type],
589 "delete": True,
590 }
591 ]
592 }
593 }
594
595 logger.info("Remove static routes")
596 result = create_static_routes(tgen, input_dict_2)
597 assert result is True, "Testcase {} : Failed \n Error: {}".format(
598 tc_name, result
599 )
600
601 logger.info("Verifying %s routes on r3 are removed", addr_type)
602 result = verify_rib(
603 tgen,
604 addr_type,
605 dut,
606 input_dict_2,
607 next_hop=[],
608 protocol=protocol,
609 expected=False,
610 )
611 assert (
612 result is not True
613 ), "Testcase {} : Failed \n Expected: Routes still present in {} RIB. Found: {}".format(
614 tc_name, dut, result
615 )
616
617 for addr_type in ADDR_TYPES:
618 # Enable static routes
619 input_dict_4 = {
620 "r2": {
621 "static_routes": [
622 {"network": NETWORK[addr_type], "next_hop": NEXT_HOP_IP[addr_type]}
623 ]
624 }
625 }
626
627 logger.info("Enable static route")
628 result = create_static_routes(tgen, input_dict_4)
629 assert result is True, "Testcase {} : Failed \n Error: {}".format(
630 tc_name, result
631 )
632
633 logger.info("Verifying %s routes on r3", addr_type)
634 result = verify_rib(
635 tgen,
636 addr_type,
637 dut,
638 input_dict_4,
639 next_hop=NEXT_HOPS[addr_type],
640 protocol=protocol,
641 )
642 assert result is True, "Testcase {} : Failed \n Error: {}".format(
643 tc_name, result
644 )
645
646
647 def test_ecmp_remove_nw_advertise(request):
648 """
649 Verify routes are cleared from BGP and RIB table of DUT,
650 when advertise network configuration is removed
651 """
652
653 tc_name = request.node.name
654 write_test_header(tc_name)
655 tgen = get_topogen()
656
657 # Verifying RIB routes
658 dut = "r3"
659 protocol = "bgp"
660
661 reset_config_on_routers(tgen)
662 static_or_nw(tgen, topo, tc_name, "advertise_nw", "r2")
663 for addr_type in ADDR_TYPES:
664 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
665
666 logger.info("Verifying %s routes on r3", addr_type)
667 result = verify_rib(
668 tgen,
669 addr_type,
670 dut,
671 input_dict,
672 next_hop=NEXT_HOPS[addr_type],
673 protocol=protocol,
674 )
675 assert result is True, "Testcase {} : Failed \n Error: {}".format(
676 tc_name, result
677 )
678
679 input_dict_3 = {
680 "r2": {
681 "bgp": {
682 "address_family": {
683 "ipv4": {
684 "unicast": {
685 "advertise_networks": [
686 {"network": NETWORK["ipv4"], "delete": True}
687 ]
688 }
689 },
690 "ipv6": {
691 "unicast": {
692 "advertise_networks": [
693 {"network": NETWORK["ipv6"], "delete": True}
694 ]
695 }
696 },
697 }
698 }
699 }
700 }
701
702 logger.info("Withdraw advertised networks")
703 result = create_router_bgp(tgen, topo, input_dict_3)
704 assert result is True, "Testcase {} : Failed \n Error: {}".format(tc_name, result)
705
706 for addr_type in ADDR_TYPES:
707 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
708
709 logger.info("Verifying %s routes on r3", addr_type)
710 result = verify_rib(
711 tgen,
712 addr_type,
713 dut,
714 input_dict,
715 next_hop=[],
716 protocol=protocol,
717 expected=False,
718 )
719 assert (
720 result is not True
721 ), "Testcase {} : Failed \n Expected: Routes still present in {} RIB. Found: {}".format(
722 tc_name, dut, result
723 )
724
725 static_or_nw(tgen, topo, tc_name, "advertise_nw", "r2")
726 for addr_type in ADDR_TYPES:
727 input_dict = {"r3": {"static_routes": [{"network": NETWORK[addr_type]}]}}
728 logger.info("Verifying %s routes on r3", addr_type)
729 result = verify_rib(
730 tgen,
731 addr_type,
732 dut,
733 input_dict,
734 next_hop=NEXT_HOPS[addr_type],
735 protocol=protocol,
736 )
737 assert result is True, "Testcase {} : Failed \n Error: {}".format(
738 tc_name, result
739 )
740
741 write_test_footer(tc_name)
742
743
744 if __name__ == "__main__":
745 args = ["-s"] + sys.argv[1:]
746 sys.exit(pytest.main(args))