]> git.proxmox.com Git - mirror_frr.git/blame - tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py
Merge pull request #9561 from idryzhov/bgp-no-router-vrf-default
[mirror_frr.git] / tests / topotests / bgp_prefix_sid / test_bgp_prefix_sid.py
CommitLineData
893799b0
HS
1#!/usr/bin/env python
2
3#
4# test_bgp_prefix_sid.py
5# Part of NetDEF Topology Tests
6#
7# Copyright (c) 2020 by LINE Corporation
8# Copyright (c) 2020 by Hiroki Shirokura <slank.dev@gmail.com>
9#
10# Permission to use, copy, modify, and/or distribute this software
11# for any purpose with or without fee is hereby granted, provided
12# that the above copyright notice and this permission notice appear
13# in all copies.
14#
15# THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
16# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
17# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
18# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
19# DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
20# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
21# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
22# OF THIS SOFTWARE.
23#
24
25"""
26test_bgp_prefix_sid.py: Test BGP topology with EBGP on prefix-sid
27"""
28
29import json
30import os
31import sys
32import functools
33import pytest
34
35CWD = os.path.dirname(os.path.realpath(__file__))
787e7624 36sys.path.append(os.path.join(CWD, "../"))
893799b0
HS
37
38# pylint: disable=C0413
39from lib import topotest
40from lib.topogen import Topogen, TopoRouter, get_topogen
41from lib.topolog import logger
893799b0 42
bf3a0a9a
DS
43pytestmark = [pytest.mark.bgpd]
44
893799b0 45
e82b531d
CH
46def build_topo(tgen):
47 router = tgen.add_router("r1")
48 switch = tgen.add_switch("s1")
49 switch.add_link(router)
893799b0 50
e82b531d 51 switch = tgen.gears["s1"]
a53c08bc
CH
52 peer1 = tgen.add_exabgp_peer("peer1", ip="10.0.0.101", defaultRoute="via 10.0.0.1")
53 peer2 = tgen.add_exabgp_peer("peer2", ip="10.0.0.102", defaultRoute="via 10.0.0.1")
e82b531d
CH
54 switch.add_link(peer1)
55 switch.add_link(peer2)
893799b0
HS
56
57
58def setup_module(module):
e82b531d 59 tgen = Topogen(build_topo, module.__name__)
893799b0
HS
60 tgen.start_topology()
61
787e7624 62 router = tgen.gears["r1"]
63 router.load_config(
64 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format("r1"))
65 )
66 router.load_config(
67 TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format("r1"))
68 )
893799b0
HS
69 router.start()
70
787e7624 71 logger.info("starting exaBGP on peer1")
893799b0 72 peer_list = tgen.exabgp_peers()
e5f0ed14 73 for pname, peer in peer_list.items():
893799b0 74 peer_dir = os.path.join(CWD, pname)
787e7624 75 env_file = os.path.join(CWD, "exabgp.env")
76 logger.info("Running ExaBGP peer")
893799b0
HS
77 peer.start(peer_dir, env_file)
78 logger.info(pname)
79
80
81def teardown_module(module):
82 tgen = get_topogen()
83 tgen.stop_topology()
84
85
86def test_r1_receive_and_advertise_prefix_sid_type1():
87 tgen = get_topogen()
787e7624 88 router = tgen.gears["r1"]
893799b0
HS
89
90 def _check_type1_r1(router, prefix, remoteLabel, labelIndex):
787e7624 91 output = router.vtysh_cmd(
92 "show bgp ipv4 labeled-unicast {} json".format(prefix)
93 )
893799b0
HS
94 output = json.loads(output)
95 expected = {
787e7624 96 "prefix": prefix,
97 "advertisedTo": {"10.0.0.101": {}, "10.0.0.102": {}},
98 "paths": [
9fa6ec14 99 {
100 "valid": True,
101 "remoteLabel": remoteLabel,
102 "labelIndex": labelIndex,
103 }
787e7624 104 ],
893799b0
HS
105 }
106 return topotest.json_cmp(output, expected)
107
787e7624 108 test_func = functools.partial(_check_type1_r1, router, "3.0.0.1/32", 800001, 1)
893799b0
HS
109 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
110 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
111
787e7624 112 test_func = functools.partial(_check_type1_r1, router, "3.0.0.2/32", 800002, 2)
893799b0
HS
113 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
114 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
115
116
117def exabgp_get_update_prefix(filename, afi, nexthop, prefix):
f862fe55 118 with open(filename) as f:
893799b0
HS
119 for line in f.readlines():
120 output = json.loads(line)
787e7624 121 ret = output.get("neighbor")
893799b0
HS
122 if ret is None:
123 continue
787e7624 124 ret = ret.get("message")
893799b0
HS
125 if ret is None:
126 continue
787e7624 127 ret = ret.get("update")
893799b0
HS
128 if ret is None:
129 continue
787e7624 130 ret = ret.get("announce")
893799b0
HS
131 if ret is None:
132 continue
133 ret = ret.get(afi)
134 if ret is None:
135 continue
136 ret = ret.get(nexthop)
137 if ret is None:
138 continue
139 ret = ret.get(prefix)
140 if ret is None:
141 continue
142 return output
143 return "Not found"
144
145
146def test_peer2_receive_prefix_sid_type1():
147 tgen = get_topogen()
787e7624 148 peer2 = tgen.gears["peer2"]
f862fe55 149 logfile = "{}/{}-received.log".format(peer2.gearlogdir, peer2.name)
893799b0
HS
150
151 def _check_type1_peer2(prefix, labelindex):
787e7624 152 output = exabgp_get_update_prefix(
f862fe55 153 logfile, "ipv4 nlri-mpls", "10.0.0.101", prefix
787e7624 154 )
893799b0 155 expected = {
787e7624 156 "type": "update",
157 "neighbor": {
158 "ip": "10.0.0.1",
159 "message": {
160 "update": {
161 "attribute": {
162 "attribute-0x28-0xE0": "0x010007000000{:08x}".format(
163 labelindex
164 )
893799b0 165 },
787e7624 166 "announce": {"ipv4 nlri-mpls": {"10.0.0.101": {}}},
893799b0 167 }
787e7624 168 },
169 },
893799b0
HS
170 }
171 return topotest.json_cmp(output, expected)
172
787e7624 173 test_func = functools.partial(_check_type1_peer2, "3.0.0.1/32", labelindex=1)
893799b0 174 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
787e7624 175 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
893799b0 176
787e7624 177 test_func = functools.partial(_check_type1_peer2, "3.0.0.2/32", labelindex=2)
893799b0 178 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
787e7624 179 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
893799b0
HS
180
181
787e7624 182if __name__ == "__main__":
893799b0
HS
183 args = ["-s"] + sys.argv[1:]
184 ret = pytest.main(args)
185 sys.exit(ret)