]> git.proxmox.com Git - mirror_frr.git/blame - tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py
Merge pull request #6455 from GalaxyGorilla/bfd_ospf_topotest
[mirror_frr.git] / tests / topotests / bgp_prefix_sid / test_bgp_prefix_sid.py
CommitLineData
893799b0
HS
1#!/usr/bin/env python
2
3#
4# test_bgp_prefix_sid.py
5# Part of NetDEF Topology Tests
6#
7# Copyright (c) 2020 by LINE Corporation
8# Copyright (c) 2020 by Hiroki Shirokura <slank.dev@gmail.com>
9#
10# Permission to use, copy, modify, and/or distribute this software
11# for any purpose with or without fee is hereby granted, provided
12# that the above copyright notice and this permission notice appear
13# in all copies.
14#
15# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
16# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
17# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
18# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
19# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
20# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
21# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
22# OF THIS SOFTWARE.
23#
24
25"""
26test_bgp_prefix_sid.py: Test BGP topology with EBGP on prefix-sid
27"""
28
29import json
30import os
31import sys
32import functools
33import pytest
34
35CWD = os.path.dirname(os.path.realpath(__file__))
787e7624 36sys.path.append(os.path.join(CWD, "../"))
893799b0
HS
37
38# pylint: disable=C0413
39from lib import topotest
40from lib.topogen import Topogen, TopoRouter, get_topogen
41from lib.topolog import logger
42from mininet.topo import Topo
43
44
45class TemplateTopo(Topo):
46 def build(self, **_opts):
47 tgen = get_topogen(self)
787e7624 48 router = tgen.add_router("r1")
49 switch = tgen.add_switch("s1")
893799b0
HS
50 switch.add_link(router)
51
787e7624 52 switch = tgen.gears["s1"]
53 peer1 = tgen.add_exabgp_peer(
54 "peer1", ip="10.0.0.101", defaultRoute="via 10.0.0.1"
55 )
56 peer2 = tgen.add_exabgp_peer(
57 "peer2", ip="10.0.0.102", defaultRoute="via 10.0.0.1"
58 )
893799b0
HS
59 switch.add_link(peer1)
60 switch.add_link(peer2)
61
62
63def setup_module(module):
64 tgen = Topogen(TemplateTopo, module.__name__)
65 tgen.start_topology()
66
787e7624 67 router = tgen.gears["r1"]
68 router.load_config(
69 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format("r1"))
70 )
71 router.load_config(
72 TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format("r1"))
73 )
893799b0
HS
74 router.start()
75
787e7624 76 logger.info("starting exaBGP on peer1")
893799b0 77 peer_list = tgen.exabgp_peers()
e5f0ed14 78 for pname, peer in peer_list.items():
893799b0 79 peer_dir = os.path.join(CWD, pname)
787e7624 80 env_file = os.path.join(CWD, "exabgp.env")
81 logger.info("Running ExaBGP peer")
893799b0
HS
82 peer.start(peer_dir, env_file)
83 logger.info(pname)
84
85
86def teardown_module(module):
87 tgen = get_topogen()
88 tgen.stop_topology()
89
90
91def test_r1_receive_and_advertise_prefix_sid_type1():
92 tgen = get_topogen()
787e7624 93 router = tgen.gears["r1"]
893799b0
HS
94
95 def _check_type1_r1(router, prefix, remoteLabel, labelIndex):
787e7624 96 output = router.vtysh_cmd(
97 "show bgp ipv4 labeled-unicast {} json".format(prefix)
98 )
893799b0
HS
99 output = json.loads(output)
100 expected = {
787e7624 101 "prefix": prefix,
102 "advertisedTo": {"10.0.0.101": {}, "10.0.0.102": {}},
103 "paths": [
9fa6ec14 104 {
105 "valid": True,
106 "remoteLabel": remoteLabel,
107 "labelIndex": labelIndex,
108 }
787e7624 109 ],
893799b0
HS
110 }
111 return topotest.json_cmp(output, expected)
112
787e7624 113 test_func = functools.partial(_check_type1_r1, router, "3.0.0.1/32", 800001, 1)
893799b0
HS
114 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
115 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
116
787e7624 117 test_func = functools.partial(_check_type1_r1, router, "3.0.0.2/32", 800002, 2)
893799b0
HS
118 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
119 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
120
121
122def exabgp_get_update_prefix(filename, afi, nexthop, prefix):
787e7624 123 with open("/tmp/peer2-received.log") as f:
893799b0
HS
124 for line in f.readlines():
125 output = json.loads(line)
787e7624 126 ret = output.get("neighbor")
893799b0
HS
127 if ret is None:
128 continue
787e7624 129 ret = ret.get("message")
893799b0
HS
130 if ret is None:
131 continue
787e7624 132 ret = ret.get("update")
893799b0
HS
133 if ret is None:
134 continue
787e7624 135 ret = ret.get("announce")
893799b0
HS
136 if ret is None:
137 continue
138 ret = ret.get(afi)
139 if ret is None:
140 continue
141 ret = ret.get(nexthop)
142 if ret is None:
143 continue
144 ret = ret.get(prefix)
145 if ret is None:
146 continue
147 return output
148 return "Not found"
149
150
151def test_peer2_receive_prefix_sid_type1():
152 tgen = get_topogen()
787e7624 153 peer2 = tgen.gears["peer2"]
893799b0
HS
154
155 def _check_type1_peer2(prefix, labelindex):
787e7624 156 output = exabgp_get_update_prefix(
157 "/tmp/peer2-received.log", "ipv4 nlri-mpls", "10.0.0.101", prefix
158 )
893799b0 159 expected = {
787e7624 160 "type": "update",
161 "neighbor": {
162 "ip": "10.0.0.1",
163 "message": {
164 "update": {
165 "attribute": {
166 "attribute-0x28-0xE0": "0x010007000000{:08x}".format(
167 labelindex
168 )
893799b0 169 },
787e7624 170 "announce": {"ipv4 nlri-mpls": {"10.0.0.101": {}}},
893799b0 171 }
787e7624 172 },
173 },
893799b0
HS
174 }
175 return topotest.json_cmp(output, expected)
176
787e7624 177 test_func = functools.partial(_check_type1_peer2, "3.0.0.1/32", labelindex=1)
893799b0 178 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
787e7624 179 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
893799b0 180
787e7624 181 test_func = functools.partial(_check_type1_peer2, "3.0.0.2/32", labelindex=2)
893799b0 182 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
787e7624 183 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
893799b0
HS
184
185
787e7624 186if __name__ == "__main__":
893799b0
HS
187 args = ["-s"] + sys.argv[1:]
188 ret = pytest.main(args)
189 sys.exit(ret)