]>
Commit | Line | Data |
---|---|---|
3130cfd7 | 1 | #!/usr/bin/env python |
acddc0ed | 2 | # SPDX-License-Identifier: ISC |
3130cfd7 DA |
3 | |
4 | # | |
5 | # Copyright (c) 2021 by | |
6 | # Donatas Abraitis <donatas.abraitis@gmail.com> | |
7 | # | |
3130cfd7 DA |
8 | |
9 | """ | |
10 | Test if 172.16.255.254/32 tagged with BLACKHOLE community is not | |
a5de4a56 | 11 | re-advertised downstream outside local AS. |
3130cfd7 DA |
12 | """ |
13 | ||
14 | import os | |
15 | import sys | |
16 | import json | |
17 | import pytest | |
18 | import functools | |
19 | ||
20 | CWD = os.path.dirname(os.path.realpath(__file__)) | |
21 | sys.path.append(os.path.join(CWD, "../")) | |
22 | ||
23 | # pylint: disable=C0413 | |
24 | from lib import topotest | |
25 | from lib.topogen import Topogen, TopoRouter, get_topogen | |
a5de4a56 | 26 | from lib.common_config import step |
3130cfd7 | 27 | |
5ad1fd54 DS |
28 | pytestmark = [pytest.mark.bgpd] |
29 | ||
3130cfd7 | 30 | |
e82b531d CH |
31 | def build_topo(tgen): |
32 | for routern in range(1, 5): | |
33 | tgen.add_router("r{}".format(routern)) | |
3130cfd7 | 34 | |
e82b531d CH |
35 | switch = tgen.add_switch("s1") |
36 | switch.add_link(tgen.gears["r1"]) | |
37 | switch.add_link(tgen.gears["r2"]) | |
3130cfd7 | 38 | |
e82b531d CH |
39 | switch = tgen.add_switch("s2") |
40 | switch.add_link(tgen.gears["r2"]) | |
41 | switch.add_link(tgen.gears["r3"]) | |
3130cfd7 | 42 | |
e82b531d CH |
43 | switch = tgen.add_switch("s3") |
44 | switch.add_link(tgen.gears["r2"]) | |
45 | switch.add_link(tgen.gears["r4"]) | |
a5de4a56 | 46 | |
3130cfd7 DA |
47 | |
48 | def setup_module(mod): | |
e82b531d | 49 | tgen = Topogen(build_topo, mod.__name__) |
3130cfd7 DA |
50 | tgen.start_topology() |
51 | ||
52 | router_list = tgen.routers() | |
53 | ||
54 | for i, (rname, router) in enumerate(router_list.items(), 1): | |
55 | router.load_config( | |
56 | TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)) | |
57 | ) | |
58 | router.load_config( | |
59 | TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format(rname)) | |
60 | ) | |
61 | ||
62 | tgen.start_router() | |
63 | ||
64 | ||
65 | def teardown_module(mod): | |
66 | tgen = get_topogen() | |
67 | tgen.stop_topology() | |
68 | ||
69 | ||
70 | def test_bgp_blackhole_community(): | |
71 | tgen = get_topogen() | |
72 | ||
73 | if tgen.routers_have_failure(): | |
74 | pytest.skip(tgen.errors) | |
75 | ||
76 | def _bgp_converge(): | |
77 | output = json.loads( | |
78 | tgen.gears["r2"].vtysh_cmd("show ip bgp 172.16.255.254/32 json") | |
79 | ) | |
a5de4a56 | 80 | expected = {"paths": [{"community": {"list": ["blackhole", "noExport"]}}]} |
3130cfd7 DA |
81 | return topotest.json_cmp(output, expected) |
82 | ||
a5de4a56 | 83 | def _bgp_no_advertise_ebgp(): |
3130cfd7 DA |
84 | output = json.loads( |
85 | tgen.gears["r2"].vtysh_cmd( | |
86 | "show ip bgp neighbor r2-eth1 advertised-routes json" | |
87 | ) | |
88 | ) | |
89 | expected = { | |
90 | "advertisedRoutes": {}, | |
91 | "totalPrefixCounter": 0, | |
92 | "filteredPrefixCounter": 0, | |
93 | } | |
94 | ||
95 | return topotest.json_cmp(output, expected) | |
96 | ||
a5de4a56 DA |
97 | def _bgp_no_advertise_ibgp(): |
98 | output = json.loads( | |
99 | tgen.gears["r2"].vtysh_cmd( | |
100 | "show ip bgp neighbor r2-eth2 advertised-routes json" | |
101 | ) | |
102 | ) | |
103 | expected = { | |
104 | "advertisedRoutes": {"172.16.255.254/32": {}}, | |
105 | "totalPrefixCounter": 2, | |
106 | } | |
107 | ||
108 | return topotest.json_cmp(output, expected) | |
109 | ||
3130cfd7 DA |
110 | test_func = functools.partial(_bgp_converge) |
111 | success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5) | |
112 | ||
113 | assert result is None, 'Failed bgp convergence in "{}"'.format(tgen.gears["r2"]) | |
114 | ||
a5de4a56 DA |
115 | step("Check if 172.16.255.254/32 is not advertised to eBGP peers") |
116 | ||
117 | test_func = functools.partial(_bgp_no_advertise_ebgp) | |
118 | success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5) | |
119 | ||
120 | assert ( | |
121 | result is None | |
122 | ), 'Advertised blackhole tagged prefix to eBGP peers in "{}"'.format( | |
123 | tgen.gears["r2"] | |
124 | ) | |
125 | ||
126 | step("Check if 172.16.255.254/32 is advertised to iBGP peers") | |
127 | ||
128 | test_func = functools.partial(_bgp_no_advertise_ibgp) | |
3130cfd7 DA |
129 | success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5) |
130 | ||
a5de4a56 DA |
131 | assert ( |
132 | result is None | |
133 | ), 'Withdrawn blackhole tagged prefix to iBGP peers in "{}"'.format( | |
3130cfd7 DA |
134 | tgen.gears["r2"] |
135 | ) | |
136 | ||
137 | ||
138 | if __name__ == "__main__": | |
139 | args = ["-s"] + sys.argv[1:] | |
140 | sys.exit(pytest.main(args)) |