]> git.proxmox.com Git - mirror_frr.git/blame - tests/topotests/bgp_accept_own/test_bgp_accept_own.py
*: auto-convert to SPDX License IDs
[mirror_frr.git] / tests / topotests / bgp_accept_own / test_bgp_accept_own.py
CommitLineData
46dbf9d0 1#!/usr/bin/env python
acddc0ed 2# SPDX-License-Identifier: ISC
46dbf9d0
DA
3
4#
5# Copyright (c) 2022 by
6# Donatas Abraitis <donatas@opensourcerouting.org>
7#
46dbf9d0
DA
8
9"""
10
11"""
12
13import os
14import sys
15import json
16import pytest
17import functools
18
19CWD = os.path.dirname(os.path.realpath(__file__))
20sys.path.append(os.path.join(CWD, "../"))
21
22# pylint: disable=C0413
23from lib import topotest
24from lib.topogen import Topogen, TopoRouter, get_topogen
25from lib.common_config import step
26
27pytestmark = [pytest.mark.bgpd]
28
29
30def build_topo(tgen):
31 tgen.add_router("ce1")
32 tgen.add_router("ce2")
33 tgen.add_router("pe1")
34 tgen.add_router("rr1")
35
36 switch = tgen.add_switch("s1")
37 switch.add_link(tgen.gears["ce1"])
38 switch.add_link(tgen.gears["pe1"])
39
40 switch = tgen.add_switch("s2")
41 switch.add_link(tgen.gears["ce2"])
42 switch.add_link(tgen.gears["pe1"])
43
44 switch = tgen.add_switch("s3")
45 switch.add_link(tgen.gears["pe1"])
46 switch.add_link(tgen.gears["rr1"])
47
b4710d03
DA
48 switch = tgen.add_switch("s4")
49 switch.add_link(tgen.gears["pe1"])
50
46dbf9d0
DA
51
52def setup_module(mod):
53 tgen = Topogen(build_topo, mod.__name__)
54 tgen.start_topology()
55
56 pe1 = tgen.gears["pe1"]
57 rr1 = tgen.gears["rr1"]
58
59 pe1.run("ip link add Customer type vrf table 1001")
60 pe1.run("ip link set up dev Customer")
61 pe1.run("ip link set pe1-eth0 master Customer")
62 pe1.run("ip link add Service type vrf table 1002")
63 pe1.run("ip link set up dev Service")
64 pe1.run("ip link set pe1-eth1 master Service")
b4710d03 65 pe1.run("ip link set pe1-eth3 master Customer")
46dbf9d0
DA
66 pe1.run("sysctl -w net.mpls.conf.pe1-eth2.input=1")
67 rr1.run("sysctl -w net.mpls.conf.rr1-eth0.input=1")
68
69 router_list = tgen.routers()
70
71 for i, (rname, router) in enumerate(router_list.items(), 1):
72 router.load_config(
73 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
74 )
75 router.load_config(
76 TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format(rname))
77 )
78 router.load_config(
79 TopoRouter.RD_OSPF, os.path.join(CWD, "{}/ospfd.conf".format(rname))
80 )
81 router.load_config(
82 TopoRouter.RD_LDP, os.path.join(CWD, "{}/ldpd.conf".format(rname))
83 )
84
85 tgen.start_router()
86
87
88def teardown_module(mod):
89 tgen = get_topogen()
90 tgen.stop_topology()
91
92
93def test_bgp_accept_own():
94 tgen = get_topogen()
95
96 if tgen.routers_have_failure():
97 pytest.skip(tgen.errors)
98
99 pe1 = tgen.gears["pe1"]
100 ce2 = tgen.gears["ce2"]
101
102 step("Check if routes are not installed in PE1 from RR1 (due to ORIGINATOR_ID)")
103
104 def _bgp_check_received_routes_due_originator_id():
105 output = json.loads(pe1.vtysh_cmd("show bgp ipv4 vpn summary json"))
b4710d03 106 expected = {"peers": {"10.10.10.101": {"pfxRcd": 0, "pfxSnt": 5}}}
46dbf9d0
DA
107 return topotest.json_cmp(output, expected)
108
109 test_func = functools.partial(_bgp_check_received_routes_due_originator_id)
110 _, result = topotest.run_and_expect(test_func, None, count=60, wait=1)
111 assert result is None, "Failed, received routes from RR1 regardless ORIGINATOR_ID"
112
113 step("Enable ACCEPT_OWN for RR1")
114
115 pe1.vtysh_cmd(
116 """
117 configure terminal
118 router bgp 65001
119 address-family ipv4 vpn
120 neighbor 10.10.10.101 accept-own
121 """
122 )
123
124 step("Check if we received routes due to ACCEPT_OWN from RR1")
125
126 def _bgp_check_received_routes_with_modified_rts():
127 output = json.loads(pe1.vtysh_cmd("show bgp ipv4 vpn summary json"))
b4710d03 128 expected = {"peers": {"10.10.10.101": {"pfxRcd": 5, "pfxSnt": 5}}}
46dbf9d0
DA
129 return topotest.json_cmp(output, expected)
130
131 test_func = functools.partial(_bgp_check_received_routes_with_modified_rts)
132 _, result = topotest.run_and_expect(test_func, None, count=60, wait=1)
133 assert (
134 result is None
135 ), "Failed, didn't receive routes from RR1 with ACCEPT_OWN enabled"
136
137 step(
138 "Check if 172.16.255.1/32 is imported into vrf Service due to modified RT list at RR1"
139 )
140
141 def _bgp_check_received_routes_with_changed_rts():
142 output = json.loads(
143 pe1.vtysh_cmd("show bgp vrf Service ipv4 unicast 172.16.255.1/32 json")
144 )
145 expected = {
146 "paths": [
147 {
b4710d03 148 "community": {"string": "65001:111"},
46dbf9d0
DA
149 "extendedCommunity": {
150 "string": "RT:192.168.1.2:2 RT:192.168.2.2:2"
151 },
152 }
153 ]
154 }
155 return topotest.json_cmp(output, expected)
156
157 test_func = functools.partial(_bgp_check_received_routes_with_changed_rts)
158 _, result = topotest.run_and_expect(test_func, None, count=60, wait=1)
159 assert (
160 result is None
161 ), "Failed, routes are not imported from RR1 with modified RT list"
162
b4710d03
DA
163 step("Check if 192.0.2.0/24 is imported to vrf Service from vrf Customer")
164
165 def _bgp_check_imported_local_routes_from_vrf_service():
166 output = json.loads(
167 pe1.vtysh_cmd("show ip route vrf Service 192.0.2.0/24 json")
168 )
169 expected = {
170 "192.0.2.0/24": [
171 {
172 "vrfName": "Service",
173 "table": 1002,
174 "installed": True,
175 "selected": True,
176 "nexthops": [
177 {
178 "fib": True,
179 "vrf": "Customer",
180 "active": True,
181 }
182 ],
183 }
184 ]
185 }
186 return topotest.json_cmp(output, expected)
187
188 test_func = functools.partial(_bgp_check_imported_local_routes_from_vrf_service)
189 _, result = topotest.run_and_expect(test_func, None, count=60, wait=1)
190 assert (
191 result is None
192 ), "Failed, didn't import local route 192.0.2.0/24 from vrf Customer to vrf Service"
193
46dbf9d0
DA
194 step("Check if 172.16.255.1/32 is announced to CE2")
195
196 def _bgp_check_received_routes_from_pe():
197 output = json.loads(ce2.vtysh_cmd("show ip route 172.16.255.1/32 json"))
198 expected = {
199 "172.16.255.1/32": [
200 {
201 "protocol": "bgp",
202 "installed": True,
203 "nexthops": [{"ip": "192.168.2.2"}],
204 }
205 ]
206 }
207 return topotest.json_cmp(output, expected)
208
209 test_func = functools.partial(_bgp_check_received_routes_from_pe)
210 _, result = topotest.run_and_expect(test_func, None, count=60, wait=1)
b4710d03 211 assert result is None, "Failed, didn't receive 172.16.255.1/32 from PE1"
46dbf9d0
DA
212
213
214if __name__ == "__main__":
215 args = ["-s"] + sys.argv[1:]
216 sys.exit(pytest.main(args))