]>
Commit | Line | Data |
---|---|---|
6b5a72a3 | 1 | #!/usr/bin/env python |
acddc0ed | 2 | # SPDX-License-Identifier: ISC |
6b5a72a3 DA |
3 | |
4 | # | |
5 | # bgp_local_as_private_remove.py | |
6 | # Part of NetDEF Topology Tests | |
7 | # | |
8 | # Copyright (c) 2019 by | |
9 | # Network Device Education Foundation, Inc. ("NetDEF") | |
10 | # | |
6b5a72a3 DA |
11 | |
12 | """ | |
13 | bgp_local_as_private_remove.py: | |
14 | Test if primary AS number is not removed in cases when `local-as` | |
15 | used together with `remove-private-AS`. | |
16 | """ | |
17 | ||
18 | import os | |
19 | import sys | |
20 | import json | |
6b5a72a3 | 21 | import pytest |
27858b3b | 22 | import functools |
6b5a72a3 DA |
23 | |
24 | CWD = os.path.dirname(os.path.realpath(__file__)) | |
787e7624 | 25 | sys.path.append(os.path.join(CWD, "../")) |
6b5a72a3 DA |
26 | |
27 | # pylint: disable=C0413 | |
27858b3b | 28 | from lib import topotest |
6b5a72a3 | 29 | from lib.topogen import Topogen, TopoRouter, get_topogen |
6b5a72a3 | 30 | |
bf3a0a9a DS |
31 | pytestmark = [pytest.mark.bgpd] |
32 | ||
787e7624 | 33 | |
e82b531d CH |
34 | def build_topo(tgen): |
35 | for routern in range(1, 5): | |
36 | tgen.add_router("r{}".format(routern)) | |
6b5a72a3 | 37 | |
e82b531d CH |
38 | switch = tgen.add_switch("s1") |
39 | switch.add_link(tgen.gears["r1"]) | |
40 | switch.add_link(tgen.gears["r2"]) | |
787e7624 | 41 | |
e82b531d CH |
42 | switch = tgen.add_switch("s2") |
43 | switch.add_link(tgen.gears["r3"]) | |
44 | switch.add_link(tgen.gears["r4"]) | |
6b5a72a3 | 45 | |
6b5a72a3 DA |
46 | |
47 | def setup_module(mod): | |
e82b531d | 48 | tgen = Topogen(build_topo, mod.__name__) |
6b5a72a3 DA |
49 | tgen.start_topology() |
50 | ||
51 | router_list = tgen.routers() | |
52 | ||
e5f0ed14 | 53 | for i, (rname, router) in enumerate(router_list.items(), 1): |
6b5a72a3 | 54 | router.load_config( |
787e7624 | 55 | TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)) |
6b5a72a3 DA |
56 | ) |
57 | router.load_config( | |
787e7624 | 58 | TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format(rname)) |
6b5a72a3 DA |
59 | ) |
60 | ||
61 | tgen.start_router() | |
62 | ||
787e7624 | 63 | |
6b5a72a3 DA |
64 | def teardown_module(mod): |
65 | tgen = get_topogen() | |
66 | tgen.stop_topology() | |
67 | ||
787e7624 | 68 | |
6b5a72a3 DA |
69 | def test_bgp_remove_private_as(): |
70 | tgen = get_topogen() | |
71 | ||
72 | if tgen.routers_have_failure(): | |
73 | pytest.skip(tgen.errors) | |
74 | ||
27858b3b DA |
75 | r2 = tgen.gears["r2"] |
76 | r4 = tgen.gears["r4"] | |
77 | ||
78 | def _bgp_converge(): | |
79 | output = json.loads(r2.vtysh_cmd("show ip bgp neighbor 192.168.255.1 json")) | |
80 | expected = { | |
81 | "192.168.255.1": { | |
82 | "bgpState": "Established", | |
83 | } | |
84 | } | |
85 | return topotest.json_cmp(output, expected) | |
86 | ||
87 | test_func = functools.partial(_bgp_converge) | |
88 | _, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5) | |
89 | assert result is None, "Can't converge initially" | |
90 | ||
91 | def _bgp_as_path(router, asn_path, asn_length): | |
92 | output = json.loads(router.vtysh_cmd("show ip bgp 172.16.255.254/32 json")) | |
93 | expected = { | |
94 | "paths": [ | |
95 | { | |
96 | "aspath": { | |
97 | "string": asn_path, | |
98 | "length": asn_length, | |
99 | } | |
100 | } | |
101 | ] | |
102 | } | |
103 | return topotest.json_cmp(output, expected) | |
104 | ||
105 | test_func = functools.partial(_bgp_as_path, r2, "500", 1) | |
106 | _, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5) | |
107 | assert result is None, "Private ASNs not stripped" | |
108 | ||
109 | test_func = functools.partial(_bgp_as_path, r4, "500 3000", 2) | |
110 | _, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5) | |
111 | assert result is None, "Private ASNs not stripped" | |
6b5a72a3 | 112 | |
6b5a72a3 | 113 | |
787e7624 | 114 | if __name__ == "__main__": |
6b5a72a3 DA |
115 | args = ["-s"] + sys.argv[1:] |
116 | sys.exit(pytest.main(args)) |