]>
Commit | Line | Data |
---|---|---|
d0046e9a DA |
1 | #!/usr/bin/env python |
2 | ||
3 | # | |
4 | # Copyright (c) 2021 by | |
5 | # Donatas Abraitis <donatas.abraitis@gmail.com> | |
6 | # | |
7 | # Permission to use, copy, modify, and/or distribute this software | |
8 | # for any purpose with or without fee is hereby granted, provided | |
9 | # that the above copyright notice and this permission notice appear | |
10 | # in all copies. | |
11 | # | |
12 | # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES | |
13 | # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | |
14 | # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR | |
15 | # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY | |
16 | # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, | |
17 | # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS | |
18 | # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE | |
19 | # OF THIS SOFTWARE. | |
20 | # | |
21 | ||
22 | """ | |
23 | Test if BGP UPDATE with AS-PATH attribute with value zero (0) | |
24 | is threated as withdrawal. | |
25 | """ | |
26 | ||
27 | import os | |
28 | import sys | |
29 | import json | |
d0046e9a DA |
30 | import pytest |
31 | import functools | |
32 | ||
33 | CWD = os.path.dirname(os.path.realpath(__file__)) | |
34 | sys.path.append(os.path.join(CWD, "../")) | |
35 | ||
36 | # pylint: disable=C0413 | |
37 | from lib import topotest | |
38 | from lib.topogen import Topogen, TopoRouter, get_topogen | |
d0046e9a DA |
39 | |
40 | pytestmark = [pytest.mark.bgpd] | |
41 | ||
42 | ||
e82b531d CH |
43 | def build_topo(tgen): |
44 | r1 = tgen.add_router("r1") | |
a53c08bc | 45 | peer1 = tgen.add_exabgp_peer("peer1", ip="10.0.0.2", defaultRoute="via 10.0.0.1") |
d0046e9a | 46 | |
e82b531d CH |
47 | switch = tgen.add_switch("s1") |
48 | switch.add_link(r1) | |
49 | switch.add_link(peer1) | |
d0046e9a DA |
50 | |
51 | ||
52 | def setup_module(mod): | |
e82b531d | 53 | tgen = Topogen(build_topo, mod.__name__) |
d0046e9a DA |
54 | tgen.start_topology() |
55 | ||
56 | router = tgen.gears["r1"] | |
57 | router.load_config(TopoRouter.RD_ZEBRA, os.path.join(CWD, "r1/zebra.conf")) | |
58 | router.load_config(TopoRouter.RD_BGP, os.path.join(CWD, "r1/bgpd.conf")) | |
59 | router.start() | |
60 | ||
61 | peer = tgen.gears["peer1"] | |
62 | peer.start(os.path.join(CWD, "peer1"), os.path.join(CWD, "exabgp.env")) | |
63 | ||
64 | ||
65 | def teardown_module(mod): | |
66 | tgen = get_topogen() | |
67 | tgen.stop_topology() | |
68 | ||
69 | ||
70 | def test_bgp_aggregator_zero(): | |
71 | tgen = get_topogen() | |
72 | ||
73 | if tgen.routers_have_failure(): | |
74 | pytest.skip(tgen.errors) | |
75 | ||
76 | def _bgp_converge(): | |
77 | output = json.loads( | |
78 | tgen.gears["r1"].vtysh_cmd("show ip bgp neighbor 10.0.0.2 json") | |
79 | ) | |
80 | expected = { | |
81 | "10.0.0.2": { | |
82 | "bgpState": "Established", | |
83 | "addressFamilyInfo": {"ipv4Unicast": {"acceptedPrefixCounter": 1}}, | |
84 | } | |
85 | } | |
86 | return topotest.json_cmp(output, expected) | |
87 | ||
88 | test_func = functools.partial(_bgp_converge) | |
89 | success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5) | |
90 | assert result is None, "More than one prefix seen at r1, SHOULD be only one." | |
91 | ||
92 | def _bgp_has_correct_routes_without_asn_0(): | |
93 | output = json.loads(tgen.gears["r1"].vtysh_cmd("show ip bgp json")) | |
94 | expected = {"routes": {"192.168.100.101/32": [{"valid": True}]}} | |
95 | return topotest.json_cmp(output, expected) | |
96 | ||
97 | test_func = functools.partial(_bgp_has_correct_routes_without_asn_0) | |
98 | success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5) | |
99 | assert result is None, "Failed listing 192.168.100.101/32, SHOULD be accepted." | |
100 | ||
101 | ||
102 | def test_memory_leak(): | |
103 | "Run the memory leak test and report results." | |
104 | tgen = get_topogen() | |
105 | if not tgen.is_memleak_enabled(): | |
106 | pytest.skip("Memory leak test/report is disabled") | |
107 | ||
108 | tgen.report_memory_leaks() | |
109 | ||
110 | ||
111 | if __name__ == "__main__": | |
112 | args = ["-s"] + sys.argv[1:] | |
113 | sys.exit(pytest.main(args)) |