]> git.proxmox.com Git - mirror_frr.git/blame - tests/topotests/bgp_prefix_sid/test_bgp_prefix_sid.py
Merge pull request #13649 from donaldsharp/unlock_the_node_or_else
[mirror_frr.git] / tests / topotests / bgp_prefix_sid / test_bgp_prefix_sid.py
CommitLineData
893799b0 1#!/usr/bin/env python
acddc0ed 2# SPDX-License-Identifier: ISC
893799b0
HS
3
4#
5# test_bgp_prefix_sid.py
6# Part of NetDEF Topology Tests
7#
8# Copyright (c) 2020 by LINE Corporation
9# Copyright (c) 2020 by Hiroki Shirokura <slank.dev@gmail.com>
10#
893799b0
HS
11
12"""
13test_bgp_prefix_sid.py: Test BGP topology with EBGP on prefix-sid
14"""
15
16import json
17import os
18import sys
19import functools
20import pytest
21
22CWD = os.path.dirname(os.path.realpath(__file__))
787e7624 23sys.path.append(os.path.join(CWD, "../"))
893799b0
HS
24
25# pylint: disable=C0413
26from lib import topotest
27from lib.topogen import Topogen, TopoRouter, get_topogen
28from lib.topolog import logger
893799b0 29
bf3a0a9a
DS
30pytestmark = [pytest.mark.bgpd]
31
893799b0 32
e82b531d
CH
33def build_topo(tgen):
34 router = tgen.add_router("r1")
35 switch = tgen.add_switch("s1")
36 switch.add_link(router)
893799b0 37
e82b531d 38 switch = tgen.gears["s1"]
a53c08bc
CH
39 peer1 = tgen.add_exabgp_peer("peer1", ip="10.0.0.101", defaultRoute="via 10.0.0.1")
40 peer2 = tgen.add_exabgp_peer("peer2", ip="10.0.0.102", defaultRoute="via 10.0.0.1")
e82b531d
CH
41 switch.add_link(peer1)
42 switch.add_link(peer2)
893799b0
HS
43
44
45def setup_module(module):
e82b531d 46 tgen = Topogen(build_topo, module.__name__)
893799b0
HS
47 tgen.start_topology()
48
787e7624 49 router = tgen.gears["r1"]
50 router.load_config(
51 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format("r1"))
52 )
53 router.load_config(
54 TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format("r1"))
55 )
893799b0
HS
56 router.start()
57
787e7624 58 logger.info("starting exaBGP on peer1")
893799b0 59 peer_list = tgen.exabgp_peers()
e5f0ed14 60 for pname, peer in peer_list.items():
893799b0 61 peer_dir = os.path.join(CWD, pname)
787e7624 62 env_file = os.path.join(CWD, "exabgp.env")
63 logger.info("Running ExaBGP peer")
893799b0
HS
64 peer.start(peer_dir, env_file)
65 logger.info(pname)
66
67
68def teardown_module(module):
69 tgen = get_topogen()
70 tgen.stop_topology()
71
72
73def test_r1_receive_and_advertise_prefix_sid_type1():
74 tgen = get_topogen()
787e7624 75 router = tgen.gears["r1"]
893799b0
HS
76
77 def _check_type1_r1(router, prefix, remoteLabel, labelIndex):
787e7624 78 output = router.vtysh_cmd(
79 "show bgp ipv4 labeled-unicast {} json".format(prefix)
80 )
893799b0
HS
81 output = json.loads(output)
82 expected = {
787e7624 83 "prefix": prefix,
84 "advertisedTo": {"10.0.0.101": {}, "10.0.0.102": {}},
85 "paths": [
9fa6ec14 86 {
87 "valid": True,
88 "remoteLabel": remoteLabel,
89 "labelIndex": labelIndex,
90 }
787e7624 91 ],
893799b0
HS
92 }
93 return topotest.json_cmp(output, expected)
94
787e7624 95 test_func = functools.partial(_check_type1_r1, router, "3.0.0.1/32", 800001, 1)
893799b0
HS
96 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
97 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
98
787e7624 99 test_func = functools.partial(_check_type1_r1, router, "3.0.0.2/32", 800002, 2)
893799b0
HS
100 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
101 assert result is None, 'Failed _check_type1_r1 in "{}"'.format(router)
102
103
104def exabgp_get_update_prefix(filename, afi, nexthop, prefix):
f862fe55 105 with open(filename) as f:
893799b0
HS
106 for line in f.readlines():
107 output = json.loads(line)
787e7624 108 ret = output.get("neighbor")
893799b0
HS
109 if ret is None:
110 continue
787e7624 111 ret = ret.get("message")
893799b0
HS
112 if ret is None:
113 continue
787e7624 114 ret = ret.get("update")
893799b0
HS
115 if ret is None:
116 continue
787e7624 117 ret = ret.get("announce")
893799b0
HS
118 if ret is None:
119 continue
120 ret = ret.get(afi)
121 if ret is None:
122 continue
123 ret = ret.get(nexthop)
124 if ret is None:
125 continue
126 ret = ret.get(prefix)
127 if ret is None:
128 continue
129 return output
130 return "Not found"
131
132
133def test_peer2_receive_prefix_sid_type1():
134 tgen = get_topogen()
787e7624 135 peer2 = tgen.gears["peer2"]
f862fe55 136 logfile = "{}/{}-received.log".format(peer2.gearlogdir, peer2.name)
893799b0
HS
137
138 def _check_type1_peer2(prefix, labelindex):
787e7624 139 output = exabgp_get_update_prefix(
f862fe55 140 logfile, "ipv4 nlri-mpls", "10.0.0.101", prefix
787e7624 141 )
893799b0 142 expected = {
787e7624 143 "type": "update",
144 "neighbor": {
145 "ip": "10.0.0.1",
146 "message": {
147 "update": {
148 "attribute": {
149 "attribute-0x28-0xE0": "0x010007000000{:08x}".format(
150 labelindex
151 )
893799b0 152 },
787e7624 153 "announce": {"ipv4 nlri-mpls": {"10.0.0.101": {}}},
893799b0 154 }
787e7624 155 },
156 },
893799b0
HS
157 }
158 return topotest.json_cmp(output, expected)
159
787e7624 160 test_func = functools.partial(_check_type1_peer2, "3.0.0.1/32", labelindex=1)
893799b0 161 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
787e7624 162 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
893799b0 163
787e7624 164 test_func = functools.partial(_check_type1_peer2, "3.0.0.2/32", labelindex=2)
893799b0 165 success, result = topotest.run_and_expect(test_func, None, count=10, wait=0.5)
787e7624 166 assert result is None, 'Failed _check_type1_peer2 in "{}"'.format("peer2")
893799b0
HS
167
168
787e7624 169if __name__ == "__main__":
893799b0
HS
170 args = ["-s"] + sys.argv[1:]
171 ret = pytest.main(args)
172 sys.exit(ret)