]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/bgprib.py
Revert "Merge pull request #11127 from louis-6wind/bgp-leak"
[mirror_frr.git] / tests / topotests / lib / bgprib.py
1 #!/usr/bin/env python
2
3 # Copyright 2018, LabN Consulting, L.L.C.
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; see the file COPYING; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 #
20 # want_rd_routes = [
21 # {'rd':'10:1', 'p':'5.1.0.0/24', 'n':'1.1.1.1', 'bp': True},
22 # {'rd':'10:1', 'p':'5.1.0.0/24', 'n':'1.1.1.1', 'bp': False},
23 #
24 # {'rd':'10:3', 'p':'5.1.0.0/24', 'n':'3.3.3.3'},
25 # ]
26 #
27 # ribRequireVpnRoutes('r2','Customer routes',want_rd_routes)
28 #
29 # want_unicast_routes = [
30 # {'p':'5.1.0.0/24', 'n':'1.1.1.1'},
31 # ]
32 #
33 # ribRequireUnicastRoutes('r1','ipv4','r1-cust1','Customer routes in vrf',want_unicast_routes)
34 # ribRequireUnicastRoutes('r1','ipv4','','Customer routes in default',want_unicast_routes)
35 #
36
37 from lib.lutil import luCommand, luResult, LUtil
38 import json
39 import re
40
41 # gpz: get rib in json form and compare against desired routes
42 class BgpRib:
43 def log(self, str):
44 LUtil.log("BgpRib: " + str)
45
46 def routes_include_wanted(self, pfxtbl, want, debug):
47 # helper function to RequireVpnRoutes
48 for pfx in pfxtbl.keys():
49 if debug:
50 self.log("trying pfx %s" % pfx)
51 if pfx != want["p"]:
52 if debug:
53 self.log("want pfx=" + want["p"] + ", not " + pfx)
54 continue
55 if debug:
56 self.log("have pfx=%s" % pfx)
57 for r in pfxtbl[pfx]:
58 bp = r.get("bestpath", False)
59 if debug:
60 self.log("trying route %s bp=%s" % (r, bp))
61 nexthops = r["nexthops"]
62 for nh in nexthops:
63 if debug:
64 self.log("trying nh %s" % nh["ip"])
65 if nh["ip"] == want["n"]:
66 if debug:
67 self.log("found %s" % want["n"])
68 if bp == want.get("bp", bp):
69 return 1
70 elif debug:
71 self.log("bestpath mismatch %s != %s" % (bp, want["bp"]))
72 else:
73 if debug:
74 self.log("want nh=" + want["n"] + ", not " + nh["ip"])
75 if debug:
76 self.log("missing route: pfx=" + want["p"] + ", nh=" + want["n"])
77 return 0
78
79 def RequireVpnRoutes(self, target, title, wantroutes, debug=0):
80 import json
81
82 logstr = "RequireVpnRoutes " + str(wantroutes)
83 # non json form for humans
84 luCommand(
85 target,
86 'vtysh -c "show bgp ipv4 vpn"',
87 ".",
88 "None",
89 "Get VPN RIB (non-json)",
90 )
91 ret = luCommand(
92 target,
93 'vtysh -c "show bgp ipv4 vpn json"',
94 ".*",
95 "None",
96 "Get VPN RIB (json)",
97 )
98 if re.search(r"^\s*$", ret):
99 # degenerate case: empty json means no routes
100 if len(wantroutes) > 0:
101 luResult(target, False, title, logstr)
102 return
103 luResult(target, True, title, logstr)
104 rib = json.loads(ret)
105 rds = rib["routes"]["routeDistinguishers"]
106 for want in wantroutes:
107 found = 0
108 if debug:
109 self.log("want rd %s" % want["rd"])
110 for rd in rds.keys():
111 if rd != want["rd"]:
112 continue
113 if debug:
114 self.log("found rd %s" % rd)
115 table = rds[rd]
116 if self.routes_include_wanted(table, want, debug):
117 found = 1
118 break
119 if not found:
120 luResult(target, False, title, logstr)
121 return
122 luResult(target, True, title, logstr)
123
124 def RequireUnicastRoutes(self, target, afi, vrf, title, wantroutes, debug=0):
125 logstr = "RequireUnicastRoutes %s" % str(wantroutes)
126 vrfstr = ""
127 if vrf != "":
128 vrfstr = "vrf %s" % (vrf)
129
130 if (afi != "ipv4") and (afi != "ipv6"):
131 self.log("ERROR invalid afi")
132
133 cmdstr = "show bgp %s %s unicast" % (vrfstr, afi)
134 # non json form for humans
135 cmd = 'vtysh -c "%s"' % cmdstr
136 luCommand(target, cmd, ".", "None", "Get %s %s RIB (non-json)" % (vrfstr, afi))
137 cmd = 'vtysh -c "%s json"' % cmdstr
138 ret = luCommand(
139 target, cmd, ".*", "None", "Get %s %s RIB (json)" % (vrfstr, afi)
140 )
141 if re.search(r"^\s*$", ret):
142 # degenerate case: empty json means no routes
143 if len(wantroutes) > 0:
144 luResult(target, False, title, logstr)
145 return
146 luResult(target, True, title, logstr)
147 rib = json.loads(ret)
148 try:
149 table = rib["routes"]
150 # KeyError: 'routes' probably means missing/bad VRF
151 except KeyError as err:
152 if vrf != "":
153 errstr = "-script ERROR: check if wrong vrf (%s)" % (vrf)
154 else:
155 errstr = "-script ERROR: check if vrf missing"
156 luResult(target, False, title + errstr, logstr)
157 return
158 # if debug:
159 # self.log("table=%s" % table)
160 for want in wantroutes:
161 if debug:
162 self.log("want=%s" % want)
163 if not self.routes_include_wanted(table, want, debug):
164 luResult(target, False, title, logstr)
165 return
166 luResult(target, True, title, logstr)
167
168
169 BgpRib = BgpRib()
170
171
172 def bgpribRequireVpnRoutes(target, title, wantroutes, debug=0):
173 BgpRib.RequireVpnRoutes(target, title, wantroutes, debug)
174
175
176 def bgpribRequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug=0):
177 BgpRib.RequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug)