]>
Commit | Line | Data |
---|---|---|
d864dd9e | 1 | #!/usr/bin/python |
acddc0ed | 2 | # SPDX-License-Identifier: ISC |
d864dd9e EB |
3 | # |
4 | # test_bgp_roles_capability.py | |
5 | # Part of NetDEF Topology Tests | |
6 | # | |
7 | # Copyright (c) 2022 by Eugene Bogomazov <eb@qrator.net> | |
8 | # Copyright (c) 2017 by | |
9 | # Network Device Education Foundation, Inc. ("NetDEF") | |
10 | # | |
d864dd9e EB |
11 | |
12 | """ | |
13 | test_bgp_roles_capability: test bgp roles negotiation | |
14 | """ | |
15 | ||
16 | import json | |
17 | import os | |
18 | import sys | |
19 | import functools | |
20 | import pytest | |
d864dd9e EB |
21 | |
22 | CWD = os.path.dirname(os.path.realpath(__file__)) | |
23 | sys.path.append(os.path.join(CWD, "../")) | |
24 | ||
25 | # pylint: disable=C0413 | |
26 | from lib import topotest | |
27 | from lib.topogen import Topogen, TopoRouter, get_topogen | |
28 | from lib.topolog import logger | |
29 | ||
30 | pytestmark = [pytest.mark.bgpd] | |
31 | ||
32 | ||
7dddd1f7 | 33 | topodef = {f"s{i}": ("r1", f"r{i}") for i in range(2, 7)} |
d864dd9e EB |
34 | |
35 | ||
36 | @pytest.fixture(scope="module") | |
37 | def tgen(request): | |
38 | tgen = Topogen(topodef, request.module.__name__) | |
39 | tgen.start_topology() | |
40 | router_list = tgen.routers() | |
41 | for rname, router in router_list.items(): | |
42 | router.load_config(TopoRouter.RD_ZEBRA, "zebra.conf") | |
43 | router.load_config(TopoRouter.RD_BGP, "bgpd.conf") | |
44 | tgen.start_router() | |
d864dd9e EB |
45 | yield tgen |
46 | tgen.stop_topology() | |
47 | ||
48 | ||
49 | @pytest.fixture(autouse=True) | |
50 | def skip_on_failure(tgen): | |
51 | if tgen.routers_have_failure(): | |
52 | pytest.skip("skipped because of previous test failure") | |
53 | ||
54 | ||
aee05e20 EB |
55 | def find_neighbor_status(router, neighbor_ip): |
56 | return json.loads(router.vtysh_cmd(f"show bgp neighbors {neighbor_ip} json"))[ | |
57 | neighbor_ip | |
58 | ] | |
59 | ||
60 | ||
61 | def check_role_mismatch(router, neighbor_ip): | |
62 | return is_role_mismatch(find_neighbor_status(router, neighbor_ip)) | |
63 | ||
64 | ||
d864dd9e EB |
65 | def is_role_mismatch(neighbor_status): |
66 | return ( | |
67 | neighbor_status["bgpState"] != "Established" | |
68 | and neighbor_status.get("lastErrorCodeSubcode") == "020B" # <2, 11> | |
69 | and "Role Mismatch" in neighbor_status.get("lastNotificationReason", "") | |
70 | ) | |
71 | ||
72 | ||
aee05e20 EB |
73 | def check_session_established(router, neighbor_ip): |
74 | neighbor_status = find_neighbor_status(router, neighbor_ip) | |
75 | return neighbor_status["bgpState"] == "Established" | |
76 | ||
77 | ||
d864dd9e EB |
78 | def test_correct_pair(tgen): |
79 | # provider-customer pair | |
aee05e20 | 80 | router = tgen.gears["r1"] |
d864dd9e | 81 | neighbor_ip = "192.168.2.2" |
aee05e20 EB |
82 | check_r2_established = functools.partial( |
83 | check_session_established, router, neighbor_ip | |
84 | ) | |
85 | success, result = topotest.run_and_expect( | |
86 | check_r2_established, True, count=20, wait=3 | |
87 | ) | |
88 | assert success, "Session with r2 is not Established" | |
89 | ||
90 | neighbor_status = find_neighbor_status(router, neighbor_ip) | |
d864dd9e | 91 | assert neighbor_status["localRole"] == "provider" |
8f2d6021 | 92 | assert neighbor_status["remoteRole"] == "customer" |
d864dd9e EB |
93 | assert ( |
94 | neighbor_status["neighborCapabilities"].get("role") == "advertisedAndReceived" | |
95 | ) | |
96 | ||
97 | ||
98 | def test_role_pair_mismatch(tgen): | |
99 | # provider-peer mistmatch | |
ed80211a EB |
100 | router = tgen.gears["r3"] |
101 | neighbor_ip = "192.168.3.1" | |
aee05e20 EB |
102 | check_r3_mismatch = functools.partial(check_role_mismatch, router, neighbor_ip) |
103 | success, result = topotest.run_and_expect(check_r3_mismatch, True, count=20, wait=3) | |
ed80211a | 104 | assert success, "Session between r1 and r3 was not correctly closed" |
d864dd9e EB |
105 | |
106 | ||
107 | def test_single_role_advertising(tgen): | |
8f2d6021 | 108 | # provider-undefined pair; we set role |
aee05e20 | 109 | router = tgen.gears["r1"] |
d864dd9e | 110 | neighbor_ip = "192.168.4.2" |
aee05e20 EB |
111 | check_r4_established = functools.partial( |
112 | check_session_established, router, neighbor_ip | |
113 | ) | |
114 | success, result = topotest.run_and_expect( | |
115 | check_r4_established, True, count=20, wait=3 | |
116 | ) | |
117 | assert success, "Session with r4 is not Established" | |
118 | ||
119 | neighbor_status = find_neighbor_status(router, neighbor_ip) | |
d864dd9e | 120 | assert neighbor_status["localRole"] == "provider" |
8f2d6021 | 121 | assert neighbor_status["remoteRole"] == "undefined" |
d864dd9e EB |
122 | assert neighbor_status["neighborCapabilities"].get("role") == "advertised" |
123 | ||
124 | ||
125 | def test_single_role_receiving(tgen): | |
8f2d6021 | 126 | # provider-undefined pair; we receive role |
aee05e20 | 127 | router = tgen.gears["r4"] |
d864dd9e | 128 | neighbor_ip = "192.168.4.1" |
aee05e20 EB |
129 | check_r1_established = functools.partial( |
130 | check_session_established, router, neighbor_ip | |
131 | ) | |
132 | success, result = topotest.run_and_expect( | |
133 | check_r1_established, True, count=20, wait=3 | |
134 | ) | |
135 | assert success, "Session with r1 is not Established" | |
136 | ||
137 | neighbor_status = find_neighbor_status(router, neighbor_ip) | |
8f2d6021 EB |
138 | assert neighbor_status["localRole"] == "undefined" |
139 | assert neighbor_status["remoteRole"] == "provider" | |
d864dd9e EB |
140 | assert neighbor_status["neighborCapabilities"].get("role") == "received" |
141 | ||
142 | ||
143 | def test_role_strict_mode(tgen): | |
8f2d6021 | 144 | # provider-undefined pair with strict-mode |
ed80211a EB |
145 | router = tgen.gears["r5"] |
146 | neighbor_ip = "192.168.5.1" | |
aee05e20 EB |
147 | check_r5_mismatch = functools.partial(check_role_mismatch, router, neighbor_ip) |
148 | success, result = topotest.run_and_expect(check_r5_mismatch, True, count=20, wait=3) | |
ed80211a | 149 | assert success, "Session between r1 and r5 was not correctly closed" |
d864dd9e EB |
150 | |
151 | ||
7dddd1f7 DA |
152 | def test_correct_pair_peer_group(tgen): |
153 | # provider-customer pair (using peer-groups) | |
154 | router = tgen.gears["r1"] | |
155 | neighbor_ip = "192.168.6.2" | |
156 | check_r6_established = functools.partial( | |
157 | check_session_established, router, neighbor_ip | |
158 | ) | |
159 | success, _ = topotest.run_and_expect(check_r6_established, True, count=20, wait=3) | |
160 | assert success, "Session with r6 is not Established" | |
161 | ||
162 | neighbor_status = find_neighbor_status(router, neighbor_ip) | |
163 | assert neighbor_status["localRole"] == "provider" | |
164 | assert neighbor_status["remoteRole"] == "customer" | |
165 | assert ( | |
166 | neighbor_status["neighborCapabilities"].get("role") == "advertisedAndReceived" | |
167 | ) | |
168 | ||
169 | ||
d864dd9e EB |
170 | if __name__ == "__main__": |
171 | args = ["-s"] + sys.argv[1:] | |
172 | sys.exit(pytest.main(args)) |