]>
Commit | Line | Data |
---|---|---|
21b432f7 | 1 | #!/usr/bin/env python |
acddc0ed | 2 | # SPDX-License-Identifier: ISC |
21b432f7 DS |
3 | # |
4 | # Copyright (c) 2022 by | |
5 | # Donald Sharp | |
6 | # | |
21b432f7 DS |
7 | |
8 | """ | |
9 | Test some bgp interface based issues that show up | |
10 | """ | |
11 | ||
12 | import os | |
13 | import sys | |
14 | import json | |
15 | import pytest | |
16 | import functools | |
17 | ||
18 | CWD = os.path.dirname(os.path.realpath(__file__)) | |
19 | sys.path.append(os.path.join(CWD, "../")) | |
20 | ||
21 | # pylint: disable=C0413 | |
22 | from lib import topotest | |
23 | from lib.topogen import Topogen, TopoRouter, get_topogen | |
24 | from lib.common_config import step | |
25 | ||
26 | pytestmark = [pytest.mark.bgpd] | |
27 | ||
28 | ||
29 | def build_topo(tgen): | |
30 | ||
31 | tgen.add_router("r1") | |
32 | tgen.add_router("r2") | |
33 | ||
34 | switch = tgen.add_switch("s1") | |
35 | switch.add_link(tgen.gears["r1"]) | |
36 | switch.add_link(tgen.gears["r2"]) | |
37 | ||
38 | ||
39 | def setup_module(mod): | |
40 | tgen = Topogen(build_topo, mod.__name__) | |
41 | tgen.start_topology() | |
42 | ||
43 | router_list = tgen.routers() | |
44 | ||
45 | for i, (rname, router) in enumerate(router_list.items(), 1): | |
46 | router.load_config( | |
47 | TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname)) | |
48 | ) | |
49 | router.load_config( | |
50 | TopoRouter.RD_BGP, os.path.join(CWD, "{}/bgpd.conf".format(rname)) | |
51 | ) | |
52 | ||
53 | tgen.start_router() | |
54 | ||
55 | ||
56 | def teardown_module(mod): | |
57 | tgen = get_topogen() | |
58 | tgen.stop_topology() | |
59 | ||
60 | ||
61 | # | |
62 | # Test these events: | |
63 | # a) create an unnumbered neighbor | |
64 | # b) shutdown the interface | |
65 | # c) remove the unnumbered peer in bgp and bgp does not crash | |
66 | def test_bgp_unnumbered_removal(): | |
67 | tgen = get_topogen() | |
68 | ||
69 | if tgen.routers_have_failure(): | |
70 | pytest.skip(tgen.errors) | |
71 | ||
72 | def _bgp_nexthop_cache(): | |
73 | output = tgen.gears["r1"].vtysh_cmd("show bgp nexthop") | |
74 | expected = "Current BGP nexthop cache:\n" | |
75 | return output == expected | |
76 | ||
77 | def _bgp_converge(): | |
78 | output = json.loads( | |
79 | tgen.gears["r1"].vtysh_cmd("show ip bgp 172.16.255.254/32 json") | |
80 | ) | |
81 | expected = {"prefix": "172.16.255.254/32"} | |
82 | ||
83 | return topotest.json_cmp(output, expected) | |
84 | ||
85 | step("Ensure Convergence of BGP") | |
86 | test_func = functools.partial(_bgp_converge) | |
87 | success, result = topotest.run_and_expect(test_func, None, count=60, wait=1) | |
88 | ||
89 | assert result is None, 'Failed bgp convergence in "{}"'.format(tgen.gears["r2"]) | |
90 | ||
91 | step("Shutdown interface r1-eth0") | |
92 | ||
93 | tgen.gears["r1"].vtysh_cmd( | |
94 | """ | |
95 | configure | |
96 | int r1-eth0 | |
97 | shutdown | |
98 | """ | |
99 | ) | |
100 | ||
101 | step("Remove the neighbor from r1") | |
102 | tgen.gears["r1"].vtysh_cmd( | |
103 | """ | |
104 | configure | |
105 | router bgp | |
106 | no neighbor r1-eth0 interface remote-as external | |
107 | """ | |
108 | ) | |
109 | ||
110 | step("Ensure that BGP does not crash") | |
111 | test_func = functools.partial(_bgp_nexthop_cache) | |
112 | success, result = topotest.run_and_expect(test_func, True, count=10, wait=1) | |
113 | ||
114 | assert result is True, "BGP did not crash on r1" | |
115 | ||
116 | ||
117 | if __name__ == "__main__": | |
118 | args = ["-s"] + sys.argv[1:] | |
119 | sys.exit(pytest.main(args)) |