]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py
Merge pull request #9561 from idryzhov/bgp-no-router-vrf-default
[mirror_frr.git] / tests / topotests / bgp_prefix_sid / test_bgp_prefix_sid.py
1 #!/usr/bin/env python
2
3 #
4 # test_bgp_prefix_sid.py
5 # Part of NetDEF Topology Tests
6 #
7 # Copyright (c) 2020 by LINE Corporation
8 # Copyright (c) 2020 by Hiroki Shirokura <slank.dev@gmail.com>
9 #
10 # Permission to use, copy, modify, and/or distribute this software
11 # for any purpose with or without fee is hereby granted, provided
12 # that the above copyright notice and this permission notice appear
13 # in all copies.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
16 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
17 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
18 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
19 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
20 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
21 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
22 # OF THIS SOFTWARE.
23 #
24
25 """
26 test_bgp_prefix_sid.py: Test BGP topology with EBGP on prefix-sid
27 """
28
29 import json
30 import os
31 import sys
32 import functools
33 import pytest
34
35 CWD = os.path.dirname(os.path.realpath(__file__))
36 sys.path.append(os.path.join(CWD, "../"))
37
38 # pylint: disable=C0413
39 from lib import topotest
40 from lib.topogen import Topogen, TopoRouter, get_topogen
41 from lib.topolog import logger
42
43 pytestmark = [pytest.mark.bgpd]
44
45
46 def build_topo(tgen):
47 router = tgen.add_router("r1")
48 switch = tgen.add_switch("s1")
49 switch.add_link(router)
50
51 switch = tgen.gears["s1"]
52 peer1 = tgen.add_exabgp_peer("peer1", ip="10.0.0.101", defaultRoute="via 10.0.0.1")
53 peer2 = tgen.add_exabgp_peer("peer2", ip="10.0.0.102", defaultRoute="via 10.0.0.1")
54 switch.add_link(peer1)
55 switch.add_link(peer2)
56
57
58 def setup_module(module):
59 tgen = Topogen(build_topo, module.__name__)
60 tgen.start_topology()
61
62 router = tgen.gears["r1"]
63 router.load_config(
64 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format("r1"))
65 )
66 router.load_config(
67 TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format("r1"))
68 )
69 router.start()
70
71 logger.info("starting exaBGP on peer1")
72 peer_list = tgen.exabgp_peers()
73 for pname, peer in peer_list.items():
74 peer_dir = os.path.join(CWD, pname)
75 env_file = os.path.join(CWD, "exabgp.env")
76 logger.info("Running ExaBGP peer")
77 peer.start(peer_dir, env_file)
78 logger.info(pname)
79
80
81 def teardown_module(module):
82 tgen = get_topogen()
83 tgen.stop_topology()
84
85
86 def test_r1_receive_and_advertise_prefix_sid_type1():
87 tgen = get_topogen()
88 router = tgen.gears["r1"]
89
90 def _check_type1_r1(router, prefix, remoteLabel, labelIndex):
91 output = router.vtysh_cmd(
92 "show bgp ipv4 labeled-unicast {} json".format(prefix)
93 )
94 output = json.loads(output)
95 expected = {
96 "prefix": prefix,
97 "advertisedTo": {"10.0.0.101": {}, "10.0.0.102": {}},
98 "paths": [
99 {
100 "valid": True,
101 "remoteLabel": remoteLabel,
102 "labelIndex": labelIndex,
103 }
104 ],
105 }
106 return topotest.json_cmp(output, expected)
107
108 test_func = functools.partial(_check_type1_r1, router, "3.0.0.1/32", 800001, 1)
109 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
110 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
111
112 test_func = functools.partial(_check_type1_r1, router, "3.0.0.2/32", 800002, 2)
113 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
114 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
115
116
117 def exabgp_get_update_prefix(filename, afi, nexthop, prefix):
118 with open(filename) as f:
119 for line in f.readlines():
120 output = json.loads(line)
121 ret = output.get("neighbor")
122 if ret is None:
123 continue
124 ret = ret.get("message")
125 if ret is None:
126 continue
127 ret = ret.get("update")
128 if ret is None:
129 continue
130 ret = ret.get("announce")
131 if ret is None:
132 continue
133 ret = ret.get(afi)
134 if ret is None:
135 continue
136 ret = ret.get(nexthop)
137 if ret is None:
138 continue
139 ret = ret.get(prefix)
140 if ret is None:
141 continue
142 return output
143 return "Not found"
144
145
146 def test_peer2_receive_prefix_sid_type1():
147 tgen = get_topogen()
148 peer2 = tgen.gears["peer2"]
149 logfile = "{}/{}-received.log".format(peer2.gearlogdir, peer2.name)
150
151 def _check_type1_peer2(prefix, labelindex):
152 output = exabgp_get_update_prefix(
153 logfile, "ipv4 nlri-mpls", "10.0.0.101", prefix
154 )
155 expected = {
156 "type": "update",
157 "neighbor": {
158 "ip": "10.0.0.1",
159 "message": {
160 "update": {
161 "attribute": {
162 "attribute-0x28-0xE0": "0x010007000000{:08x}".format(
163 labelindex
164 )
165 },
166 "announce": {"ipv4 nlri-mpls": {"10.0.0.101": {}}},
167 }
168 },
169 },
170 }
171 return topotest.json_cmp(output, expected)
172
173 test_func = functools.partial(_check_type1_peer2, "3.0.0.1/32", labelindex=1)
174 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
175 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
176
177 test_func = functools.partial(_check_type1_peer2, "3.0.0.2/32", labelindex=2)
178 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
179 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
180
181
182 if __name__ == "__main__":
183 args = ["-s"] + sys.argv[1:]
184 ret = pytest.main(args)
185 sys.exit(ret)