]> git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/bgp_snmp_mplsl3vpn/test_bgp_snmp_mplsvpn.py
Merge pull request #12798 from donaldsharp/rib_match_multicast
[mirror_frr.git] / tests / topotests / bgp_snmp_mplsl3vpn / test_bgp_snmp_mplsvpn.py
1 #!/usr/bin/env python
2 # SPDX-License-Identifier: ISC
3
4 #
5 # test_bgp_snmp_mplsl3vpn.py
6 # Part of NetDEF Topology Tests
7 #
8 # Copyright (c) 2020 by Volta Networks
9 #
10
11 """
12 test_bgp_snmp_mplsl3vpn.py: Test mplsL3Vpn MIB [RFC4382].
13 """
14
15 import os
16 import sys
17 from time import sleep
18 import pytest
19
20 # Save the Current Working Directory to find configuration files.
21 CWD = os.path.dirname(os.path.realpath(__file__))
22 sys.path.append(os.path.join(CWD, "../"))
23
24 # pylint: disable=C0413
25 # Import topogen and topotest helpers
26 from lib.topogen import Topogen, TopoRouter, get_topogen
27 from lib.snmptest import SnmpTester
28 from lib import topotest
29
30 # Required to instantiate the topology builder class.
31
32 pytestmark = [pytest.mark.bgpd, pytest.mark.isisd, pytest.mark.snmp]
33
34
35 def build_topo(tgen):
36 "Build function"
37
38 # This function only purpose is to define allocation and relationship
39 # between routers, switches and hosts.
40 #
41 #
42 # Create routers
43 tgen.add_router("r1")
44 tgen.add_router("r2")
45 tgen.add_router("r3")
46 tgen.add_router("r4")
47 tgen.add_router("ce1")
48 tgen.add_router("ce2")
49 tgen.add_router("ce3")
50 tgen.add_router("ce4")
51
52 # r1-r2
53 switch = tgen.add_switch("s1")
54 switch.add_link(tgen.gears["r1"])
55 switch.add_link(tgen.gears["r2"])
56
57 # r1-r3
58 switch = tgen.add_switch("s2")
59 switch.add_link(tgen.gears["r1"])
60 switch.add_link(tgen.gears["r3"])
61
62 # r1-r4
63 switch = tgen.add_switch("s3")
64 switch.add_link(tgen.gears["r1"])
65 switch.add_link(tgen.gears["r4"])
66
67 # r1-ce1
68 switch = tgen.add_switch("s4")
69 switch.add_link(tgen.gears["r1"])
70 switch.add_link(tgen.gears["ce1"])
71
72 # r1-ce3
73 switch = tgen.add_switch("s5")
74 switch.add_link(tgen.gears["r1"])
75 switch.add_link(tgen.gears["ce3"])
76
77 # r1-ce4
78 switch = tgen.add_switch("s6")
79 switch.add_link(tgen.gears["r1"])
80 switch.add_link(tgen.gears["ce4"])
81
82 # r1-dangling
83 switch = tgen.add_switch("s7")
84 switch.add_link(tgen.gears["r1"])
85
86 # r2-r3
87 switch = tgen.add_switch("s8")
88 switch.add_link(tgen.gears["r2"])
89 switch.add_link(tgen.gears["r3"])
90
91 # r3-r4
92 switch = tgen.add_switch("s9")
93 switch.add_link(tgen.gears["r3"])
94 switch.add_link(tgen.gears["r4"])
95
96 # r4-ce2
97 switch = tgen.add_switch("s10")
98 switch.add_link(tgen.gears["r4"])
99 switch.add_link(tgen.gears["ce2"])
100
101
102 def setup_module(mod):
103 "Sets up the pytest environment"
104
105 # skip tests is SNMP not installed
106 snmpd = os.system("which snmpd")
107 if snmpd:
108 error_msg = "SNMP not installed - skipping"
109 pytest.skip(error_msg)
110
111 # This function initiates the topology build with Topogen...
112 tgen = Topogen(build_topo, mod.__name__)
113 # ... and here it calls Mininet initialization functions.
114 tgen.start_topology()
115
116 r1 = tgen.gears["r1"]
117 r2 = tgen.gears["r2"]
118 r3 = tgen.gears["r3"]
119 r4 = tgen.gears["r4"]
120
121 # setup VRF-a in r1
122 r1.run("ip link add VRF-a type vrf table 1001")
123 r1.run("ip link set up dev VRF-a")
124 r1.run("ip link add VRF-b type vrf table 1002")
125 r1.run("ip link set up dev VRF-b")
126 r4.run("ip link add VRF-a type vrf table 1001")
127 r4.run("ip link set up dev VRF-a")
128
129 # enslave vrf interfaces
130 r1.run("ip link set r1-eth3 master VRF-a")
131 r1.run("ip link set r1-eth4 master VRF-a")
132 r1.run("ip link set r1-eth5 master VRF-b")
133 r4.run("ip link set r4-eth1 master VRF-a")
134
135 r1.run("sysctl -w net.ipv4.ip_forward=1")
136 r2.run("sysctl -w net.ipv4.ip_forward=1")
137 r3.run("sysctl -w net.ipv4.ip_forward=1")
138 r4.run("sysctl -w net.ipv4.ip_forward=1")
139 r1.run("sysctl -w net.mpls.conf.r1-eth0.input=1")
140 r1.run("sysctl -w net.mpls.conf.r1-eth1.input=1")
141 r1.run("sysctl -w net.mpls.conf.r1-eth2.input=1")
142
143 router_list = tgen.routers()
144
145 # For all registered routers, load the zebra configuration file
146 for rname, router in router_list.items():
147 router.load_config(
148 TopoRouter.RD_ZEBRA, os.path.join(CWD, "{}/zebra.conf".format(rname))
149 )
150 router.load_config(
151 TopoRouter.RD_ISIS, os.path.join(CWD, "{}/isisd.conf".format(rname))
152 )
153 router.load_config(
154 TopoRouter.RD_BGP,
155 os.path.join(CWD, "{}/bgpd.conf".format(rname)),
156 "-M snmp",
157 )
158 router.load_config(
159 TopoRouter.RD_SNMP,
160 os.path.join(CWD, "{}/snmpd.conf".format(rname)),
161 "-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap",
162 )
163
164 # After loading the configurations, this function loads configured daemons.
165 tgen.start_router()
166
167
168 def teardown_module(mod):
169 "Teardown the pytest environment"
170 tgen = get_topogen()
171
172 # This function tears down the whole topology.
173 tgen.stop_topology()
174
175
176 # SNMP utilities - maybe move to lib
177 def snmp_uint32_to_oid(val):
178 oid1 = int(val / 16777216) % 256
179 oid2 = int(val / 65536) % 256
180 oid3 = int(val / 256) % 256
181 oid4 = int(val) % 256
182 return "%(oid1)s.%(oid2)s.%(oid3)s.%(oid4)s" % locals()
183
184
185 def snmp_oid_to_uint32(oid):
186 values = oid.split(".")
187 return (
188 (int(values[0]) * 16777216)
189 + (int(values[1]) * 65536)
190 + (int(values[2]) * 256)
191 + int(values[3])
192 )
193
194
195 def snmp_str_to_oid(str):
196 out_oid = ""
197 for char in str:
198 out_oid += "{}.".format(ord(char))
199 return out_oid.rstrip(".")
200
201
202 def snmp_oid_to_str(oid):
203 out_str = ""
204 oids = oid.split(".")
205 for char in oids:
206 out_str += "{}".format(chr(int(char)))
207 return out_str
208
209
210 def snmp_rte_oid(vrf, dtype, dest, plen, policy, ntype, nhop=0):
211 oid_1 = snmp_str_to_oid(vrf)
212 oid_2 = dtype
213 oid_3 = dest
214 oid_4 = plen
215 oid_5 = "0.{}".format(policy)
216 oid_6 = ntype
217 if ntype == 0:
218 oid_7 = ""
219 else:
220 oid_7 = ".{}".format(nhop)
221
222 return "{}.{}.{}.{}.{}.{}{}".format(oid_1, oid_2, oid_3, oid_4, oid_5, oid_6, oid_7)
223
224
225 def test_pe1_converge_evpn():
226 "Wait for protocol convergence"
227 tgen = get_topogen()
228
229 r1 = tgen.gears["r1"]
230
231 def _convergence():
232 r1 = tgen.gears["r1"]
233 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
234
235 return r1_snmp.test_oid("bgpVersion", "10")
236
237 _, result = topotest.run_and_expect(_convergence, True, count=20, wait=1)
238 assertmsg = "BGP SNMP does not seem to be running"
239 assert result, assertmsg
240
241 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
242 count = 0
243 passed = False
244 while count < 125:
245 if r1_snmp.test_oid_walk("bgpPeerLocalAddr.10.4.4.4", ["10.1.1.1"]):
246 passed = True
247 break
248 count += 1
249 sleep(1)
250 # tgen.mininet_cli()
251 assertmsg = "BGP Peer 10.4.4.4 did not connect"
252 assert passed, assertmsg
253
254
255 interfaces_up_test = {
256 "mplsL3VpnConfiguredVrfs": "2",
257 "mplsL3VpnActiveVrfs": "2",
258 "mplsL3VpnConnectedInterfaces": "3",
259 "mplsL3VpnNotificationEnable": "true(1)",
260 "mplsL3VpnVrfConfMaxPossRts": "0",
261 "mplsL3VpnVrfConfRteMxThrshTime": "0 seconds",
262 "mplsL3VpnIlllblRcvThrsh": "0",
263 }
264
265 interfaces_down_test = {
266 "mplsL3VpnConfiguredVrfs": "2",
267 "mplsL3VpnActiveVrfs": "1",
268 "mplsL3VpnConnectedInterfaces": "3",
269 "mplsL3VpnNotificationEnable": "true(1)",
270 "mplsL3VpnVrfConfMaxPossRts": "0",
271 "mplsL3VpnVrfConfRteMxThrshTime": "0 seconds",
272 "mplsL3VpnIlllblRcvThrsh": "0",
273 }
274
275
276 def test_r1_mplsvpn_scalars():
277 "check scalar values"
278 tgen = get_topogen()
279 r1 = tgen.gears["r1"]
280 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
281
282 for item in interfaces_up_test.keys():
283 assertmsg = "{} should be {}: value {}".format(
284 item, interfaces_up_test[item], r1_snmp.get_next(item)
285 )
286 assert r1_snmp.test_oid(item, interfaces_up_test[item]), assertmsg
287
288
289 def test_r1_mplsvpn_scalars_interface():
290 "check scalar interface changing values"
291 tgen = get_topogen()
292 r1 = tgen.gears["r1"]
293 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
294
295 r1.vtysh_cmd("conf t\ninterface r1-eth3\nshutdown")
296 r1.vtysh_cmd("conf t\ninterface r1-eth4\nshutdown")
297
298 for item in interfaces_up_test.keys():
299 assertmsg = "{} should be {}: value {}".format(
300 item, interfaces_down_test[item], r1_snmp.get_next(item)
301 )
302 assert r1_snmp.test_oid(item, interfaces_down_test[item]), assertmsg
303
304 r1.vtysh_cmd("conf t\ninterface r1-eth3\nno shutdown")
305 r1.vtysh_cmd("conf t\ninterface r1-eth4\nno shutdown")
306
307 for item in interfaces_up_test.keys():
308 assertmsg = "{} should be {}: value {}".format(
309 item, interfaces_up_test[item], r1_snmp.get_next(item)
310 )
311 assert r1_snmp.test_oid(item, interfaces_up_test[item]), assertmsg
312
313
314 def router_interface_get_ifindex(router, interface):
315 ifindex = 0
316 r_int_output = router.vtysh_cmd(
317 "show interface {}-{}".format(router.name, interface)
318 )
319 int_lines = r_int_output.splitlines()
320 for line in int_lines:
321 line_items = line.lstrip().split(" ")
322 if "index" in line_items[0]:
323 ifindex = line_items[1]
324 return ifindex
325
326
327 def generate_vrf_ifindex_oid(vrf, ifindex):
328
329 intoid = snmp_uint32_to_oid(int(ifindex))
330 vrfoid = snmp_str_to_oid(vrf)
331 oid = "{}.{}".format(vrfoid, intoid)
332
333 return oid
334
335
336 def generate_vrf_index_type_oid(vrf, index, type):
337 vrfoid = snmp_str_to_oid(vrf)
338 intoid = snmp_uint32_to_oid(int(index))
339 oid = "{}.{}.{}".format(vrfoid, intoid, type)
340
341 return oid
342
343
344 iftable_up_test = {
345 "mplsL3VpnIfVpnClassification": ["enterprise(2)", "enterprise(2)", "enterprise(2)"],
346 "mplsL3VpnIfConfStorageType": ["volatile(2)", "volatile(2)", "volatile(2)"],
347 "mplsL3VpnIfConfRowStatus": ["active(1)", "active(1)", "active(1)"],
348 }
349
350
351 def get_timetick_val(time):
352 return int(time.split(" ")[0].lstrip("(").rstrip(")"))
353
354
355 def test_r1_mplsvpn_IfTable():
356 "mplsL3VpnIf table values"
357
358 tgen = get_topogen()
359 r1 = tgen.gears["r1"]
360
361 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
362
363 # tgen.mininet_cli()
364 eth3_ifindex = router_interface_get_ifindex(r1, "eth3")
365 eth4_ifindex = router_interface_get_ifindex(r1, "eth4")
366 eth5_ifindex = router_interface_get_ifindex(r1, "eth5")
367
368 # get ifindex and make sure the oid is correct
369
370 oids = []
371 # generate oid
372 oids.append(generate_vrf_ifindex_oid("VRF-a", eth3_ifindex))
373 oids.append(generate_vrf_ifindex_oid("VRF-a", eth4_ifindex))
374 oids.append(generate_vrf_ifindex_oid("VRF-b", eth5_ifindex))
375
376 for item in iftable_up_test.keys():
377 assertmsg = "{} should be {} oids {} full dict {}:".format(
378 item, iftable_up_test[item], oids, r1_snmp.walk(item)
379 )
380 assert r1_snmp.test_oid_walk(item, iftable_up_test[item], oids), assertmsg
381
382 # an inactive vrf should not affect these values
383 r1.cmd("ip link set r1-eth5 down")
384
385 for item in iftable_up_test.keys():
386 assertmsg = "{} should be {} oids {} full dict {}:".format(
387 item, iftable_up_test[item], oids, r1_snmp.walk(item)
388 )
389 assert r1_snmp.test_oid_walk(item, iftable_up_test[item], oids), assertmsg
390
391 r1.cmd("ip link set r1-eth5 up")
392
393
394 vrftable_test = {
395 "mplsL3VpnVrfDescription": ["VRF-a", "VRF-b"],
396 "mplsL3VpnVrfRD": ['"10:1"', '"10:2"'],
397 "mplsL3VpnVrfOperStatus": ["up(1)", "up(1)"],
398 "mplsL3VpnVrfActiveInterfaces": ["2", "1"],
399 "mplsL3VpnVrfAssociatedInterfaces": ["2", "1"],
400 "mplsL3VpnVrfConfMidRteThresh": ["0", "0"],
401 "mplsL3VpnVrfConfHighRteThresh": ["0", "0"],
402 "mplsL3VpnVrfConfMaxRoutes": ["0", "0"],
403 "mplsL3VpnVrfConfRowStatus": ["active(1)", "active(1)"],
404 "mplsL3VpnVrfConfAdminStatus": ["up(1)", "up(1)"],
405 "mplsL3VpnVrfConfStorageType": ["volatile(2)", "volatile(2)"],
406 }
407
408
409 def test_r1_mplsvpn_VrfTable():
410 tgen = get_topogen()
411
412 r1 = tgen.gears["r1"]
413
414 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
415
416 # tgen.mininet_cli()
417
418 oids = []
419
420 oids.append(snmp_str_to_oid("VRF-a"))
421 oids.append(snmp_str_to_oid("VRF-b"))
422
423 # check items
424 for item in vrftable_test.keys():
425 assertmsg = "{} should be {} oids {} full dict {}:".format(
426 item, vrftable_test[item], oids, r1_snmp.walk(item)
427 )
428 assert r1_snmp.test_oid_walk(item, vrftable_test[item], oids), assertmsg
429
430 # check timetick set and stable
431 ts_a = r1_snmp.get("mplsL3VpnVrfCreationTime.{}".format(snmp_str_to_oid("VRF-a")))
432 ts_b = r1_snmp.get("mplsL3VpnVrfCreationTime.{}".format(snmp_str_to_oid("VRF-b")))
433 ts_val_a1 = get_timetick_val(ts_a)
434 ts_val_b1 = get_timetick_val(ts_b)
435 ts_a = r1_snmp.get("mplsL3VpnVrfCreationTime.{}".format(snmp_str_to_oid("VRF-a")))
436 ts_b = r1_snmp.get("mplsL3VpnVrfCreationTime.{}".format(snmp_str_to_oid("VRF-b")))
437 ts_val_a2 = get_timetick_val(ts_a)
438 ts_val_b2 = get_timetick_val(ts_b)
439
440 assertmsg = "timestamp values for VRF-a do not match {} {}".format(
441 ts_val_a1, ts_val_a2
442 )
443 assert ts_val_a1 == ts_val_a2, assertmsg
444 assertmsg = "timestamp values for VRF-b do not match {} {}".format(
445 ts_val_b1, ts_val_b2
446 )
447 assert ts_val_b1 == ts_val_b2, assertmsg
448
449 # take Last changed time, fiddle with active interfaces, ensure
450 # time changes and active interfaces change
451 ts_last = r1_snmp.get(
452 "mplsL3VpnVrfConfLastChanged.{}".format(snmp_str_to_oid("VRF-a"))
453 )
454 ts_val_last_1 = get_timetick_val(ts_last)
455 r1.vtysh_cmd("conf t\ninterface r1-eth3\nshutdown")
456 active_int = r1_snmp.get(
457 "mplsL3VpnVrfActiveInterfaces.{}".format(snmp_str_to_oid("VRF-a"))
458 )
459 assertmsg = "mplsL3VpnVrfActiveInterfaces incorrect should be 1 value {}".format(
460 active_int
461 )
462 assert active_int == "1", assertmsg
463
464 ts_last = r1_snmp.get(
465 "mplsL3VpnVrfConfLastChanged.{}".format(snmp_str_to_oid("VRF-a"))
466 )
467 ts_val_last_2 = get_timetick_val(ts_last)
468 assertmsg = "mplsL3VpnVrfConfLastChanged does not update on interface change"
469 assert ts_val_last_2 > ts_val_last_1, assertmsg
470 r1.vtysh_cmd("conf t\ninterface r1-eth3\nno shutdown")
471
472 # take Last changed time, fiddle with associated interfaces, ensure
473 # time changes and active interfaces change
474 ts_last = r1_snmp.get(
475 "mplsL3VpnVrfConfLastChanged.{}".format(snmp_str_to_oid("VRF-a"))
476 )
477 ts_val_last_1 = get_timetick_val(ts_last)
478 r1.cmd("ip link set r1-eth6 master VRF-a")
479 r1.cmd("ip link set r1-eth6 up")
480
481 associated_int = r1_snmp.get(
482 "mplsL3VpnVrfAssociatedInterfaces.{}".format(snmp_str_to_oid("VRF-a"))
483 )
484 assertmsg = (
485 "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format(
486 associated_int
487 )
488 )
489
490 assert associated_int == "3", assertmsg
491 ts_last = r1_snmp.get(
492 "mplsL3VpnVrfConfLastChanged.{}".format(snmp_str_to_oid("VRF-a"))
493 )
494 ts_val_last_2 = get_timetick_val(ts_last)
495 assertmsg = "mplsL3VpnVrfConfLastChanged does not update on interface change"
496 assert ts_val_last_2 > ts_val_last_1, assertmsg
497 r1.cmd("ip link del r1-eth6 master VRF-a")
498 r1.cmd("ip link set r1-eth6 down")
499
500
501 rt_table_test = {
502 "mplsL3VpnVrfRT": ['"1:1"', '"1:2"'],
503 "mplsL3VpnVrfRTDescr": ["RT both for VRF VRF-a", "RT both for VRF VRF-b"],
504 "mplsL3VpnVrfRTRowStatus": ["active(1)", "active(1)"],
505 "mplsL3VpnVrfRTStorageType": ["volatile(2)", "volatile(2)"],
506 }
507
508
509 def test_r1_mplsvpn_VrfRT_table():
510 tgen = get_topogen()
511
512 r1 = tgen.gears["r1"]
513
514 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
515
516 oids = []
517 oids.append(generate_vrf_index_type_oid("VRF-a", 1, 3))
518 oids.append(generate_vrf_index_type_oid("VRF-b", 1, 3))
519
520 # check items
521 for item in rt_table_test.keys():
522 print(item)
523 assertmsg = "{} should be {} oids {} full dict {}:".format(
524 item, rt_table_test[item], oids, r1_snmp.walk(item)
525 )
526 assert r1_snmp.test_oid_walk(item, rt_table_test[item], oids), assertmsg
527
528
529 def test_r1_mplsvpn_perf_table():
530 tgen = get_topogen()
531
532 r1 = tgen.gears["r1"]
533
534 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
535
536 # tgen.mininet_cli()
537 oid_a = snmp_str_to_oid("VRF-a")
538 oid_b = snmp_str_to_oid("VRF-b")
539
540 # poll for 10 seconds for routes to appear
541 count = 0
542 passed = False
543 while count < 60:
544 if r1_snmp.test_oid_walk(
545 "mplsL3VpnVrfPerfCurrNumRoutes.{}".format(oid_a), ["7"]
546 ):
547 passed = True
548 break
549 count += 1
550 sleep(1)
551 # tgen.mininet_cli()
552 assertmsg = "mplsL3VpnVrfPerfCurrNumRoutes shouold be 7 got {}".format(
553 r1_snmp.get("mplsL3VpnVrfPerfCurrNumRoutes.{}".format(oid_a))
554 )
555 assert passed, assertmsg
556 curr_a = int(r1_snmp.get("mplsL3VpnVrfPerfCurrNumRoutes.{}".format(oid_a)))
557 del_a = int(r1_snmp.get("mplsL3VpnVrfPerfRoutesDeleted.{}".format(oid_a)))
558 add_a = int(r1_snmp.get("mplsL3VpnVrfPerfRoutesAdded.{}".format(oid_a)))
559
560 assertmsg = "FAIL curr{} does not equal added{} - deleted {}".format(
561 curr_a, add_a, del_a
562 )
563 assert curr_a == (add_a - del_a), assertmsg
564 curr_b = int(r1_snmp.get("mplsL3VpnVrfPerfCurrNumRoutes.{}".format(oid_b)))
565 del_b = int(r1_snmp.get("mplsL3VpnVrfPerfRoutesDeleted.{}".format(oid_b)))
566 add_b = int(r1_snmp.get("mplsL3VpnVrfPerfRoutesAdded.{}".format(oid_b)))
567 assertmsg = "FAIL curr{} does not equal added{} - deleted {}".format(
568 curr_b, add_b, del_b
569 )
570 assert curr_b == (add_b - del_b), assertmsg
571
572
573 rte_table_test = {
574 "mplsL3VpnVrfRteInetCidrDestType": [
575 "ipv4(1)",
576 "ipv4(1)",
577 "ipv4(1)",
578 "ipv4(1)",
579 "ipv4(1)",
580 "ipv4(1)",
581 "ipv4(1)",
582 ],
583 "mplsL3VpnVrfRteInetCidrDest": [
584 "0A 05 05 05",
585 "0A 07 07 07",
586 "C0 A8 22 00",
587 "C0 A8 64 00",
588 "C0 A8 64 00",
589 "C0 A8 C8 00",
590 "C0 A8 C8 00",
591 ],
592 "mplsL3VpnVrfRteInetCidrPfxLen": ["32", "32", "24", "24", "24", "24", "24"],
593 "mplsL3VpnVrfRteInetCidrNHopType": [
594 "ipv4(1)",
595 "ipv4(1)",
596 "ipv4(1)",
597 "ipv4(1)",
598 "unknown(0)",
599 "ipv4(1)",
600 "unknown(0)",
601 ],
602 "mplsL3VpnVrfRteInetCidrNextHop": [
603 "C0 A8 64 0A",
604 "C0 A8 C8 0A",
605 "0A 04 04 04",
606 "C0 A8 64 0A",
607 '""',
608 "C0 A8 C8 0A",
609 '""',
610 ],
611 "mplsL3VpnVrfRteInetCidrType": [
612 "local(3)",
613 "local(3)",
614 "remote(4)",
615 "local(3)",
616 "other(1)",
617 "local(3)",
618 "other(1)",
619 ],
620 "mplsL3VpnVrfRteInetCidrProto": [
621 "bgp(14)",
622 "bgp(14)",
623 "bgp(14)",
624 "bgp(14)",
625 "local(2)",
626 "bgp(14)",
627 "local(2)",
628 ],
629 "mplsL3VpnVrfRteInetCidrNextHopAS": [
630 "65001",
631 "65001",
632 "0",
633 "65001",
634 "0",
635 "65001",
636 "0",
637 ],
638 "mplsL3VpnVrfRteInetCidrMetric1": ["0", "0", "20", "0", "0", "0", "0"],
639 "mplsL3VpnVrfRteInetCidrMetric2": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
640 "mplsL3VpnVrfRteInetCidrMetric3": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
641 "mplsL3VpnVrfRteInetCidrMetric4": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
642 "mplsL3VpnVrfRteInetCidrMetric5": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
643 "mplsL3VpnVrfRteXCPointer": ["00", "00", "00", "00", "00", "00", "00"],
644 "mplsL3VpnVrfRteInetCidrStatus": [
645 "active(1)",
646 "active(1)",
647 "active(1)",
648 "active(1)",
649 "active(1)",
650 "active(1)",
651 "active(1)",
652 ],
653 }
654
655
656 def test_r1_mplsvpn_rte_table():
657 tgen = get_topogen()
658
659 r1 = tgen.gears["r1"]
660
661 r1_snmp = SnmpTester(r1, "10.1.1.1", "public", "2c")
662
663 # tgen.mininet_cli()
664 oid_1 = snmp_rte_oid("VRF-a", 1, "10.5.5.5", 32, 0, 1, "192.168.100.10")
665 oid_2 = snmp_rte_oid("VRF-a", 1, "10.7.7.7", 32, 0, 1, "192.168.200.10")
666 oid_3 = snmp_rte_oid("VRF-a", 1, "192.168.34.0", 24, 0, 1, "10.4.4.4")
667 oid_4 = snmp_rte_oid("VRF-a", 1, "192.168.100.0", 24, 1, 1, "192.168.100.10")
668 oid_4_a = snmp_rte_oid("VRF-a", 1, "192.168.100.0", 24, 0, 1, "192.168.100.10")
669 oid_5 = snmp_rte_oid("VRF-a", 1, "192.168.100.0", 24, 0, 0)
670 oid_5_a = snmp_rte_oid("VRF-a", 1, "192.168.100.0", 24, 1, 0)
671 oid_6 = snmp_rte_oid("VRF-a", 1, "192.168.200.0", 24, 1, 1, "192.168.200.10")
672 oid_6_a = snmp_rte_oid("VRF-a", 1, "192.168.200.0", 24, 0, 1, "192.168.200.10")
673 oid_7 = snmp_rte_oid("VRF-a", 1, "192.168.200.0", 24, 0, 0)
674 oid_7_a = snmp_rte_oid("VRF-a", 1, "192.168.200.0", 24, 1, 0)
675
676 oid_lists = [
677 [oid_1, oid_2, oid_3, oid_4, oid_5, oid_6, oid_7],
678 [oid_1, oid_2, oid_3, oid_4_a, oid_5_a, oid_6, oid_7],
679 [oid_1, oid_2, oid_3, oid_4, oid_5, oid_6_a, oid_7_a],
680 [oid_1, oid_2, oid_3, oid_4_a, oid_5_a, oid_6_a, oid_7_a],
681 [oid_1, oid_2, oid_3, oid_4, oid_5, oid_6, oid_7],
682 [oid_1, oid_2, oid_3, oid_4_a, oid_5_a, oid_6, oid_7],
683 [oid_1, oid_2, oid_3, oid_4, oid_5, oid_6_a, oid_7_a],
684 [oid_1, oid_2, oid_3, oid_4_a, oid_5_a, oid_6_a, oid_7_a],
685 ]
686
687 # check items
688
689 passed = False
690 for oid_list in oid_lists:
691 passed = True
692 for item in rte_table_test.keys():
693 print(item)
694 assertmsg = "{} should be {} oids {} full dict {}:".format(
695 item, rte_table_test[item], oid_list, r1_snmp.walk(item)
696 )
697 if not r1_snmp.test_oid_walk(item, rte_table_test[item], oid_list):
698 passed = False
699 break
700 print(
701 "{} should be {} oids {} full dict {}:".format(
702 item, rte_table_test[item], oid_list, r1_snmp.walk(item)
703 )
704 )
705 if passed:
706 break
707 # generate ifindex row grabbing ifindices from vtysh
708 if passed:
709 ifindex_row = [
710 router_interface_get_ifindex(r1, "eth3"),
711 router_interface_get_ifindex(r1, "eth4"),
712 router_interface_get_ifindex(r1, "eth2"),
713 router_interface_get_ifindex(r1, "eth3"),
714 "0",
715 router_interface_get_ifindex(r1, "eth4"),
716 "0",
717 ]
718 if not r1_snmp.test_oid_walk(
719 "mplsL3VpnVrfRteInetCidrIfIndex", ifindex_row, oid_list
720 ):
721 passed = False
722
723 print("passed {}".format(passed))
724 assert passed, assertmsg
725
726
727 def test_memory_leak():
728 "Run the memory leak test and report results."
729 tgen = get_topogen()
730 if not tgen.is_memleak_enabled():
731 pytest.skip("Memory leak test/report is disabled")
732
733 tgen.report_memory_leaks()
734
735
736 if __name__ == "__main__":
737 args = ["-s"] + sys.argv[1:]
738 sys.exit(pytest.main(args))