]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py
Merge pull request #6455 from GalaxyGorilla/bfd_ospf_topotest
[mirror_frr.git] / tests / topotests / bgp_prefix_sid / test_bgp_prefix_sid.py
1 #!/usr/bin/env python
2
3 #
4 # test_bgp_prefix_sid.py
5 # Part of NetDEF Topology Tests
6 #
7 # Copyright (c) 2020 by LINE Corporation
8 # Copyright (c) 2020 by Hiroki Shirokura <slank.dev@gmail.com>
9 #
10 # Permission to use, copy, modify, and/or distribute this software
11 # for any purpose with or without fee is hereby granted, provided
12 # that the above copyright notice and this permission notice appear
13 # in all copies.
14 #
15 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
16 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
17 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
18 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
19 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
20 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
21 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
22 # OF THIS SOFTWARE.
23 #
24
25 """
26 test_bgp_prefix_sid.py: Test BGP topology with EBGP on prefix-sid
27 """
28
29 import json
30 import os
31 import sys
32 import functools
33 import pytest
34
35 CWD = os.path.dirname(os.path.realpath(__file__))
36 sys.path.append(os.path.join(CWD, "../"))
37
38 # pylint: disable=C0413
39 from lib import topotest
40 from lib.topogen import Topogen, TopoRouter, get_topogen
41 from lib.topolog import logger
42 from mininet.topo import Topo
43
44
45 class TemplateTopo(Topo):
46 def build(self, **_opts):
47 tgen = get_topogen(self)
48 router = tgen.add_router("r1")
49 switch = tgen.add_switch("s1")
50 switch.add_link(router)
51
52 switch = tgen.gears["s1"]
53 peer1 = tgen.add_exabgp_peer(
54 "peer1", ip="10.0.0.101", defaultRoute="via 10.0.0.1"
55 )
56 peer2 = tgen.add_exabgp_peer(
57 "peer2", ip="10.0.0.102", defaultRoute="via 10.0.0.1"
58 )
59 switch.add_link(peer1)
60 switch.add_link(peer2)
61
62
63 def setup_module(module):
64 tgen = Topogen(TemplateTopo, module.__name__)
65 tgen.start_topology()
66
67 router = tgen.gears["r1"]
68 router.load_config(
69 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format("r1"))
70 )
71 router.load_config(
72 TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format("r1"))
73 )
74 router.start()
75
76 logger.info("starting exaBGP on peer1")
77 peer_list = tgen.exabgp_peers()
78 for pname, peer in peer_list.items():
79 peer_dir = os.path.join(CWD, pname)
80 env_file = os.path.join(CWD, "exabgp.env")
81 logger.info("Running ExaBGP peer")
82 peer.start(peer_dir, env_file)
83 logger.info(pname)
84
85
86 def teardown_module(module):
87 tgen = get_topogen()
88 tgen.stop_topology()
89
90
91 def test_r1_receive_and_advertise_prefix_sid_type1():
92 tgen = get_topogen()
93 router = tgen.gears["r1"]
94
95 def _check_type1_r1(router, prefix, remoteLabel, labelIndex):
96 output = router.vtysh_cmd(
97 "show bgp ipv4 labeled-unicast {} json".format(prefix)
98 )
99 output = json.loads(output)
100 expected = {
101 "prefix": prefix,
102 "advertisedTo": {"10.0.0.101": {}, "10.0.0.102": {}},
103 "paths": [
104 {
105 "valid": True,
106 "remoteLabel": remoteLabel,
107 "labelIndex": labelIndex,
108 }
109 ],
110 }
111 return topotest.json_cmp(output, expected)
112
113 test_func = functools.partial(_check_type1_r1, router, "3.0.0.1/32", 800001, 1)
114 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
115 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
116
117 test_func = functools.partial(_check_type1_r1, router, "3.0.0.2/32", 800002, 2)
118 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
119 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
120
121
122 def exabgp_get_update_prefix(filename, afi, nexthop, prefix):
123 with open("/tmp/peer2-received.log") as f:
124 for line in f.readlines():
125 output = json.loads(line)
126 ret = output.get("neighbor")
127 if ret is None:
128 continue
129 ret = ret.get("message")
130 if ret is None:
131 continue
132 ret = ret.get("update")
133 if ret is None:
134 continue
135 ret = ret.get("announce")
136 if ret is None:
137 continue
138 ret = ret.get(afi)
139 if ret is None:
140 continue
141 ret = ret.get(nexthop)
142 if ret is None:
143 continue
144 ret = ret.get(prefix)
145 if ret is None:
146 continue
147 return output
148 return "Not found"
149
150
151 def test_peer2_receive_prefix_sid_type1():
152 tgen = get_topogen()
153 peer2 = tgen.gears["peer2"]
154
155 def _check_type1_peer2(prefix, labelindex):
156 output = exabgp_get_update_prefix(
157 "/tmp/peer2-received.log", "ipv4 nlri-mpls", "10.0.0.101", prefix
158 )
159 expected = {
160 "type": "update",
161 "neighbor": {
162 "ip": "10.0.0.1",
163 "message": {
164 "update": {
165 "attribute": {
166 "attribute-0x28-0xE0": "0x010007000000{:08x}".format(
167 labelindex
168 )
169 },
170 "announce": {"ipv4 nlri-mpls": {"10.0.0.101": {}}},
171 }
172 },
173 },
174 }
175 return topotest.json_cmp(output, expected)
176
177 test_func = functools.partial(_check_type1_peer2, "3.0.0.1/32", labelindex=1)
178 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
179 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
180
181 test_func = functools.partial(_check_type1_peer2, "3.0.0.2/32", labelindex=2)
182 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
183 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
184
185
186 if __name__ == "__main__":
187 args = ["-s"] + sys.argv[1:]
188 ret = pytest.main(args)
189 sys.exit(ret)