]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/bgp_aggregate_address_topo1/test_bgp_aggregate_address_topo1.py
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / tests / topotests / bgp_aggregate_address_topo1 / test_bgp_aggregate_address_topo1.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: ISC
3
4 #
5 # test_bgp_aggregate_address_topo1.py
6 # Part of NetDEF Topology Tests
7 #
8 # Copyright (c) 2020 by
9 # Network Device Education Foundation, Inc. ("NetDEF")
10 #
11
12 """
13 Test BGP aggregate address features.
14 """
15
16 import os
17 import sys
18 import pytest
19 import functools
20
21 CWD = os.path.dirname(os.path.realpath(__file__))
22 sys.path.append(os.path.join(CWD, "../"))
23
24 # pylint: disable=C0413
25 from lib import topotest
26 from lib.topogen import Topogen, TopoRouter, get_topogen
27 from lib.topolog import logger
28
29 pytestmark = [pytest.mark.bgpd]
30
31
32 def build_topo(tgen):
33 r1 = tgen.add_router("r1")
34 r2 = tgen.add_router("r2")
35 peer1 = tgen.add_exabgp_peer("peer1", ip="10.0.0.2", defaultRoute="via 10.0.0.1")
36
37 switch = tgen.add_switch("s1")
38 switch.add_link(r1)
39 switch.add_link(peer1)
40
41 switch = tgen.add_switch("s2")
42 switch.add_link(r1)
43 switch.add_link(r2)
44
45
46 def setup_module(mod):
47 tgen = Topogen(build_topo, mod.__name__)
48 tgen.start_topology()
49
50 router = tgen.gears["r1"]
51 router.load_config(TopoRouter.RD_ZEBRA, os.path.join(CWD, "r1/zebra.conf"))
52 router.load_config(TopoRouter.RD_BGP, os.path.join(CWD, "r1/bgpd.conf"))
53 router.start()
54
55 router = tgen.gears["r2"]
56 router.load_config(TopoRouter.RD_ZEBRA, os.path.join(CWD, "r2/zebra.conf"))
57 router.load_config(TopoRouter.RD_BGP, os.path.join(CWD, "r2/bgpd.conf"))
58 router.start()
59
60 peer = tgen.gears["peer1"]
61 peer.start(os.path.join(CWD, "peer1"), os.path.join(CWD, "exabgp.env"))
62
63
64 def teardown_module(mod):
65 tgen = get_topogen()
66 tgen.stop_topology()
67
68
69 def expect_route(router_name, routes_expected):
70 "Helper function to avoid repeated code."
71 tgen = get_topogen()
72 test_func = functools.partial(
73 topotest.router_json_cmp,
74 tgen.gears[router_name],
75 "show ip route json",
76 routes_expected,
77 )
78 _, result = topotest.run_and_expect(test_func, None, count=120, wait=1)
79 assertmsg = '"{}" BGP convergence failure'.format(router_name)
80 assert result is None, assertmsg
81
82
83 def test_expect_convergence():
84 "Test that BGP protocol converged."
85
86 tgen = get_topogen()
87 if tgen.routers_have_failure():
88 pytest.skip(tgen.errors)
89
90 logger.info("waiting for protocols to converge")
91
92 def expect_loopback_route(router, iptype, route, proto):
93 "Wait until route is present on RIB for protocol."
94 logger.info("waiting route {} in {}".format(route, router))
95 test_func = functools.partial(
96 topotest.router_json_cmp,
97 tgen.gears[router],
98 "show {} route json".format(iptype),
99 {route: [{"protocol": proto}]},
100 )
101 _, result = topotest.run_and_expect(test_func, None, count=130, wait=1)
102 assertmsg = '"{}" BGP convergence failure'.format(router)
103 assert result is None, assertmsg
104
105 expect_loopback_route("r2", "ip", "10.254.254.1/32", "bgp")
106 expect_loopback_route("r2", "ip", "10.254.254.3/32", "bgp")
107
108
109 def test_bgp_aggregate_address_matching_med_only():
110 "Test that the command matching-MED-only works."
111
112 tgen = get_topogen()
113 if tgen.routers_have_failure():
114 pytest.skip(tgen.errors)
115
116 routes_expected = {
117 # All MED matches, aggregation must exist.
118 "192.168.0.0/24": [{"protocol": "bgp", "metric": 0}],
119 "192.168.0.1/32": [{"protocol": "bgp", "metric": 10}],
120 "192.168.0.2/32": [{"protocol": "bgp", "metric": 10}],
121 "192.168.0.3/32": [{"protocol": "bgp", "metric": 10}],
122 # Non matching MED: aggregation must not exist.
123 "192.168.1.0/24": None,
124 "192.168.1.1/32": [{"protocol": "bgp", "metric": 10}],
125 "192.168.1.2/32": [{"protocol": "bgp", "metric": 10}],
126 "192.168.1.3/32": [{"protocol": "bgp", "metric": 20}],
127 }
128
129 test_func = functools.partial(
130 topotest.router_json_cmp,
131 tgen.gears["r2"],
132 "show ip route json",
133 routes_expected,
134 )
135 _, result = topotest.run_and_expect(test_func, None, count=20, wait=1)
136 assertmsg = '"r2" BGP convergence failure'
137 assert result is None, assertmsg
138
139
140 def test_bgp_aggregate_address_match_and_suppress():
141 "Test that the command matching-MED-only with suppression works."
142
143 tgen = get_topogen()
144 if tgen.routers_have_failure():
145 pytest.skip(tgen.errors)
146
147 tgen.gears["r1"].vtysh_multicmd(
148 """
149 configure terminal
150 router bgp 65000
151 address-family ipv4 unicast
152 no aggregate-address 192.168.0.0/24 matching-MED-only
153 no aggregate-address 192.168.1.0/24 matching-MED-only
154 aggregate-address 192.168.0.0/24 matching-MED-only summary-only
155 aggregate-address 192.168.1.0/24 matching-MED-only summary-only
156 """
157 )
158
159 routes_expected = {
160 # All MED matches, aggregation must exist.
161 "192.168.0.0/24": [{"protocol": "bgp", "metric": 0}],
162 "192.168.0.1/32": None,
163 "192.168.0.2/32": None,
164 "192.168.0.3/32": None,
165 # Non matching MED: aggregation must not exist.
166 "192.168.1.0/24": None,
167 "192.168.1.1/32": [{"protocol": "bgp", "metric": 10}],
168 "192.168.1.2/32": [{"protocol": "bgp", "metric": 10}],
169 "192.168.1.3/32": [{"protocol": "bgp", "metric": 20}],
170 }
171
172 test_func = functools.partial(
173 topotest.router_json_cmp,
174 tgen.gears["r2"],
175 "show ip route json",
176 routes_expected,
177 )
178 _, result = topotest.run_and_expect(test_func, None, count=120, wait=1)
179 assertmsg = '"r2" BGP convergence failure'
180 assert result is None, assertmsg
181
182
183 def test_bgp_aggregate_address_suppress_map():
184 "Test that the command suppress-map works."
185
186 tgen = get_topogen()
187 if tgen.routers_have_failure():
188 pytest.skip(tgen.errors)
189
190 expect_route(
191 "r2",
192 {
193 "192.168.2.0/24": [{"protocol": "bgp"}],
194 "192.168.2.1/32": None,
195 "192.168.2.2/32": [{"protocol": "bgp"}],
196 "192.168.2.3/32": [{"protocol": "bgp"}],
197 },
198 )
199
200 # Change route map and test again.
201 tgen.gears["r1"].vtysh_multicmd(
202 """
203 configure terminal
204 router bgp 65000
205 address-family ipv4 unicast
206 no aggregate-address 192.168.2.0/24 suppress-map rm-sup-one
207 aggregate-address 192.168.2.0/24 suppress-map rm-sup-two
208 """
209 )
210
211 expect_route(
212 "r2",
213 {
214 "192.168.2.0/24": [{"protocol": "bgp"}],
215 "192.168.2.1/32": [{"protocol": "bgp"}],
216 "192.168.2.2/32": None,
217 "192.168.2.3/32": [{"protocol": "bgp"}],
218 },
219 )
220
221
222 def test_bgp_aggregate_address_suppress_map_update_route_map():
223 "Test that the suppress-map late route map creation works."
224 tgen = get_topogen()
225 if tgen.routers_have_failure():
226 pytest.skip(tgen.errors)
227
228 tgen.gears["r1"].vtysh_multicmd(
229 """
230 configure terminal
231 router bgp 65000
232 address-family ipv4 unicast
233 no aggregate-address 192.168.2.0/24 suppress-map rm-sup-two
234 aggregate-address 192.168.2.0/24 suppress-map rm-sup-three
235 """
236 )
237
238 expect_route(
239 "r2",
240 {
241 "192.168.2.0/24": [{"protocol": "bgp"}],
242 "192.168.2.1/32": [{"protocol": "bgp"}],
243 "192.168.2.2/32": [{"protocol": "bgp"}],
244 "192.168.2.3/32": [{"protocol": "bgp"}],
245 },
246 )
247
248 # Create missing route map and test again.
249 tgen.gears["r1"].vtysh_multicmd(
250 """
251 configure terminal
252 route-map rm-sup-three permit 10
253 match ip address acl-sup-three
254 """
255 )
256
257 expect_route(
258 "r2",
259 {
260 "192.168.2.0/24": [{"protocol": "bgp"}],
261 "192.168.2.1/32": [{"protocol": "bgp"}],
262 "192.168.2.2/32": [{"protocol": "bgp"}],
263 "192.168.2.3/32": None,
264 },
265 )
266
267
268 def test_memory_leak():
269 "Run the memory leak test and report results."
270 tgen = get_topogen()
271 if not tgen.is_memleak_enabled():
272 pytest.skip("Memory leak test/report is disabled")
273
274 tgen.report_memory_leaks()
275
276
277 if __name__ == "__main__":
278 args = ["-s"] + sys.argv[1:]
279 sys.exit(pytest.main(args))