]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/bgprib.py
bgpd: l3vni add-del handle non-defualt rt
[mirror_frr.git] / tests / topotests / lib / bgprib.py
1 #!/usr/bin/env python
2
3 # Copyright 2018, LabN Consulting, L.L.C.
4 #
5 # This program is free software; you can redistribute it and/or
6 # modify it under the terms of the GNU General Public License
7 # as published by the Free Software Foundation; either version 2
8 # of the License, or (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License along
16 # with this program; see the file COPYING; if not, write to the Free Software
17 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
18
19 #
20 # want_rd_routes = [
21 # {'rd':'10:1', 'p':'5.1.0.0/24', 'n':'1.1.1.1'},
22 # {'rd':'10:1', 'p':'5.1.0.0/24', 'n':'1.1.1.1'},
23 #
24 # {'rd':'10:3', 'p':'5.1.0.0/24', 'n':'3.3.3.3'},
25 # ]
26 #
27 # ribRequireVpnRoutes('r2','Customer routes',want_rd_routes)
28 #
29 # want_unicast_routes = [
30 # {'p':'5.1.0.0/24', 'n':'1.1.1.1'},
31 # ]
32 #
33 # ribRequireUnicastRoutes('r1','ipv4','r1-cust1','Customer routes in vrf',want_unicast_routes)
34 # ribRequireUnicastRoutes('r1','ipv4','','Customer routes in default',want_unicast_routes)
35 #
36
37 from lutil import luCommand,luResult
38 import json
39 import re
40
41 # gpz: get rib in json form and compare against desired routes
42 class BgpRib:
43 def routes_include_wanted(self,pfxtbl,want,debug):
44 # helper function to RequireVpnRoutes
45 for pfx in pfxtbl.iterkeys():
46 if debug:
47 print 'trying pfx ' + pfx
48 if pfx != want['p']:
49 if debug:
50 print 'want pfx=' + want['p'] + ', not ' + pfx
51 continue
52 if debug:
53 print 'have pfx=' + pfx
54 for r in pfxtbl[pfx]:
55 if debug:
56 print 'trying route'
57 nexthops = r['nexthops']
58 for nh in nexthops:
59 if debug:
60 print 'trying nh ' + nh['ip']
61 if nh['ip'] == want['n']:
62 if debug:
63 print 'found ' + want['n']
64 return 1
65 else:
66 if debug:
67 print 'want nh=' + want['n'] + ', not ' + nh['ip']
68 if debug:
69 print 'missing route: pfx=' + want['p'] + ', nh=' + want['n']
70 return 0
71
72 def RequireVpnRoutes(self, target, title, wantroutes, debug=0):
73 import json
74 logstr = "RequireVpnRoutes " + str(wantroutes)
75 #non json form for humans
76 luCommand(target,'vtysh -c "show bgp ipv4 vpn"','.','None','Get VPN RIB (non-json)')
77 ret = luCommand(target,'vtysh -c "show bgp ipv4 vpn json"','.*','None','Get VPN RIB (json)')
78 if re.search(r'^\s*$', ret):
79 # degenerate case: empty json means no routes
80 if len(wantroutes) > 0:
81 luResult(target, False, title, logstr)
82 return
83 luResult(target, True, title, logstr)
84 rib = json.loads(ret)
85 rds = rib['routes']['routeDistinguishers']
86 for want in wantroutes:
87 found = 0
88 if debug:
89 print "want rd " + want['rd']
90 for rd in rds.iterkeys():
91 if rd != want['rd']:
92 continue
93 if debug:
94 print "found rd " + rd
95 table = rds[rd]
96 if self.routes_include_wanted(table,want,debug):
97 found = 1
98 break
99 if not found:
100 luResult(target, False, title, logstr)
101 return
102 luResult(target, True, title, logstr)
103
104 def RequireUnicastRoutes(self,target,afi,vrf,title,wantroutes,debug=0):
105 logstr = "RequireVpnRoutes " + str(wantroutes)
106 vrfstr = ''
107 if vrf != '':
108 vrfstr = 'vrf %s' % (vrf)
109
110 if (afi != 'ipv4') and (afi != 'ipv6'):
111 print "ERROR invalid afi";
112
113 cmdstr = 'show bgp %s %s unicast' % (vrfstr, afi)
114 #non json form for humans
115 cmd = 'vtysh -c "%s"' % cmdstr
116 luCommand(target,cmd,'.','None','Get %s %s RIB (non-json)' % (vrfstr, afi))
117 cmd = 'vtysh -c "%s json"' % cmdstr
118 ret = luCommand(target,cmd,'.*','None','Get %s %s RIB (json)' % (vrfstr, afi))
119 if re.search(r'^\s*$', ret):
120 # degenerate case: empty json means no routes
121 if len(wantroutes) > 0:
122 luResult(target, False, title, logstr)
123 return
124 luResult(target, True, title, logstr)
125 rib = json.loads(ret)
126 table = rib['routes']
127 for want in wantroutes:
128 if not self.routes_include_wanted(table,want,debug):
129 luResult(target, False, title, logstr)
130 return
131 luResult(target, True, title, logstr)
132
133
134 BgpRib=BgpRib()
135
136 def bgpribRequireVpnRoutes(target, title, wantroutes, debug=0):
137 BgpRib.RequireVpnRoutes(target, title, wantroutes, debug)
138
139 def bgpribRequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug=0):
140 BgpRib.RequireUnicastRoutes(target, afi, vrf, title, wantroutes, debug)