]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/bgp_snmp_mplsl3vpn/test_bgp_snmp_mplsvpn.py
2 # SPDX-License-Identifier: ISC
5 # test_bgp_snmp_mplsl3vpn.py
6 # Part of NetDEF Topology Tests
8 # Copyright (c) 2020 by Volta Networks
12 test_bgp_snmp_mplsl3vpn.py: Test mplsL3Vpn MIB [RFC4382].
17 from time
import sleep
20 # Save the Current Working Directory to find configuration files.
21 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
22 sys
.path
.append(os
.path
.join(CWD
, "../"))
24 # pylint: disable=C0413
25 # Import topogen and topotest helpers
26 from lib
.topogen
import Topogen
, TopoRouter
, get_topogen
27 from lib
.snmptest
import SnmpTester
28 from lib
import topotest
30 # Required to instantiate the topology builder class.
32 pytestmark
= [pytest
.mark
.bgpd
, pytest
.mark
.isisd
, pytest
.mark
.snmp
]
38 # This function only purpose is to define allocation and relationship
39 # between routers, switches and hosts.
47 tgen
.add_router("ce1")
48 tgen
.add_router("ce2")
49 tgen
.add_router("ce3")
50 tgen
.add_router("ce4")
53 switch
= tgen
.add_switch("s1")
54 switch
.add_link(tgen
.gears
["r1"])
55 switch
.add_link(tgen
.gears
["r2"])
58 switch
= tgen
.add_switch("s2")
59 switch
.add_link(tgen
.gears
["r1"])
60 switch
.add_link(tgen
.gears
["r3"])
63 switch
= tgen
.add_switch("s3")
64 switch
.add_link(tgen
.gears
["r1"])
65 switch
.add_link(tgen
.gears
["r4"])
68 switch
= tgen
.add_switch("s4")
69 switch
.add_link(tgen
.gears
["r1"])
70 switch
.add_link(tgen
.gears
["ce1"])
73 switch
= tgen
.add_switch("s5")
74 switch
.add_link(tgen
.gears
["r1"])
75 switch
.add_link(tgen
.gears
["ce3"])
78 switch
= tgen
.add_switch("s6")
79 switch
.add_link(tgen
.gears
["r1"])
80 switch
.add_link(tgen
.gears
["ce4"])
83 switch
= tgen
.add_switch("s7")
84 switch
.add_link(tgen
.gears
["r1"])
87 switch
= tgen
.add_switch("s8")
88 switch
.add_link(tgen
.gears
["r2"])
89 switch
.add_link(tgen
.gears
["r3"])
92 switch
= tgen
.add_switch("s9")
93 switch
.add_link(tgen
.gears
["r3"])
94 switch
.add_link(tgen
.gears
["r4"])
97 switch
= tgen
.add_switch("s10")
98 switch
.add_link(tgen
.gears
["r4"])
99 switch
.add_link(tgen
.gears
["ce2"])
102 def setup_module(mod
):
103 "Sets up the pytest environment"
105 # skip tests is SNMP not installed
106 snmpd
= os
.system("which snmpd")
108 error_msg
= "SNMP not installed - skipping"
109 pytest
.skip(error_msg
)
111 # This function initiates the topology build with Topogen...
112 tgen
= Topogen(build_topo
, mod
.__name
__)
113 # ... and here it calls Mininet initialization functions.
114 tgen
.start_topology()
116 r1
= tgen
.gears
["r1"]
117 r2
= tgen
.gears
["r2"]
118 r3
= tgen
.gears
["r3"]
119 r4
= tgen
.gears
["r4"]
122 r1
.run("ip link add VRF-a type vrf table 1001")
123 r1
.run("ip link set up dev VRF-a")
124 r1
.run("ip link add VRF-b type vrf table 1002")
125 r1
.run("ip link set up dev VRF-b")
126 r4
.run("ip link add VRF-a type vrf table 1001")
127 r4
.run("ip link set up dev VRF-a")
129 # enslave vrf interfaces
130 r1
.run("ip link set r1-eth3 master VRF-a")
131 r1
.run("ip link set r1-eth4 master VRF-a")
132 r1
.run("ip link set r1-eth5 master VRF-b")
133 r4
.run("ip link set r4-eth1 master VRF-a")
135 r1
.run("sysctl -w net.ipv4.ip_forward=1")
136 r2
.run("sysctl -w net.ipv4.ip_forward=1")
137 r3
.run("sysctl -w net.ipv4.ip_forward=1")
138 r4
.run("sysctl -w net.ipv4.ip_forward=1")
139 r1
.run("sysctl -w net.mpls.conf.r1-eth0.input=1")
140 r1
.run("sysctl -w net.mpls.conf.r1-eth1.input=1")
141 r1
.run("sysctl -w net.mpls.conf.r1-eth2.input=1")
143 router_list
= tgen
.routers()
145 # For all registered routers, load the zebra configuration file
146 for rname
, router
in router_list
.items():
148 TopoRouter
.RD_ZEBRA
, os
.path
.join(CWD
, "{}/zebra.conf".format(rname
))
151 TopoRouter
.RD_ISIS
, os
.path
.join(CWD
, "{}/isisd.conf".format(rname
))
155 os
.path
.join(CWD
, "{}/bgpd.conf".format(rname
)),
160 os
.path
.join(CWD
, "{}/snmpd.conf".format(rname
)),
161 "-Le -Ivacm_conf,usmConf,iquery -V -DAgentX,trap",
164 # After loading the configurations, this function loads configured daemons.
168 def teardown_module(mod
):
169 "Teardown the pytest environment"
172 # This function tears down the whole topology.
176 # SNMP utilities - maybe move to lib
177 def snmp_uint32_to_oid(val
):
178 oid1
= int(val
/ 16777216) % 256
179 oid2
= int(val
/ 65536) % 256
180 oid3
= int(val
/ 256) % 256
181 oid4
= int(val
) % 256
182 return "%(oid1)s.%(oid2)s.%(oid3)s.%(oid4)s" % locals()
185 def snmp_oid_to_uint32(oid
):
186 values
= oid
.split(".")
188 (int(values
[0]) * 16777216)
189 + (int(values
[1]) * 65536)
190 + (int(values
[2]) * 256)
195 def snmp_str_to_oid(str):
198 out_oid
+= "{}.".format(ord(char
))
199 return out_oid
.rstrip(".")
202 def snmp_oid_to_str(oid
):
204 oids
= oid
.split(".")
206 out_str
+= "{}".format(chr(int(char
)))
210 def snmp_rte_oid(vrf
, dtype
, dest
, plen
, policy
, ntype
, nhop
=0):
211 oid_1
= snmp_str_to_oid(vrf
)
215 oid_5
= "0.{}".format(policy
)
220 oid_7
= ".{}".format(nhop
)
222 return "{}.{}.{}.{}.{}.{}{}".format(oid_1
, oid_2
, oid_3
, oid_4
, oid_5
, oid_6
, oid_7
)
225 def test_pe1_converge_evpn():
226 "Wait for protocol convergence"
229 r1
= tgen
.gears
["r1"]
232 r1
= tgen
.gears
["r1"]
233 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
235 return r1_snmp
.test_oid("bgpVersion", "10")
237 _
, result
= topotest
.run_and_expect(_convergence
, True, count
=20, wait
=1)
238 assertmsg
= "BGP SNMP does not seem to be running"
239 assert result
, assertmsg
241 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
245 if r1_snmp
.test_oid_walk("bgpPeerLocalAddr.10.4.4.4", ["10.1.1.1"]):
251 assertmsg
= "BGP Peer 10.4.4.4 did not connect"
252 assert passed
, assertmsg
255 interfaces_up_test
= {
256 "mplsL3VpnConfiguredVrfs": "2",
257 "mplsL3VpnActiveVrfs": "2",
258 "mplsL3VpnConnectedInterfaces": "3",
259 "mplsL3VpnNotificationEnable": "true(1)",
260 "mplsL3VpnVrfConfMaxPossRts": "0",
261 "mplsL3VpnVrfConfRteMxThrshTime": "0 seconds",
262 "mplsL3VpnIlllblRcvThrsh": "0",
265 interfaces_down_test
= {
266 "mplsL3VpnConfiguredVrfs": "2",
267 "mplsL3VpnActiveVrfs": "1",
268 "mplsL3VpnConnectedInterfaces": "3",
269 "mplsL3VpnNotificationEnable": "true(1)",
270 "mplsL3VpnVrfConfMaxPossRts": "0",
271 "mplsL3VpnVrfConfRteMxThrshTime": "0 seconds",
272 "mplsL3VpnIlllblRcvThrsh": "0",
276 def test_r1_mplsvpn_scalars():
277 "check scalar values"
279 r1
= tgen
.gears
["r1"]
280 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
282 for item
in interfaces_up_test
.keys():
283 assertmsg
= "{} should be {}: value {}".format(
284 item
, interfaces_up_test
[item
], r1_snmp
.get_next(item
)
286 assert r1_snmp
.test_oid(item
, interfaces_up_test
[item
]), assertmsg
289 def test_r1_mplsvpn_scalars_interface():
290 "check scalar interface changing values"
292 r1
= tgen
.gears
["r1"]
293 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
295 r1
.vtysh_cmd("conf t\ninterface r1-eth3\nshutdown")
296 r1
.vtysh_cmd("conf t\ninterface r1-eth4\nshutdown")
298 for item
in interfaces_up_test
.keys():
299 assertmsg
= "{} should be {}: value {}".format(
300 item
, interfaces_down_test
[item
], r1_snmp
.get_next(item
)
302 assert r1_snmp
.test_oid(item
, interfaces_down_test
[item
]), assertmsg
304 r1
.vtysh_cmd("conf t\ninterface r1-eth3\nno shutdown")
305 r1
.vtysh_cmd("conf t\ninterface r1-eth4\nno shutdown")
307 for item
in interfaces_up_test
.keys():
308 assertmsg
= "{} should be {}: value {}".format(
309 item
, interfaces_up_test
[item
], r1_snmp
.get_next(item
)
311 assert r1_snmp
.test_oid(item
, interfaces_up_test
[item
]), assertmsg
314 def router_interface_get_ifindex(router
, interface
):
316 r_int_output
= router
.vtysh_cmd(
317 "show interface {}-{}".format(router
.name
, interface
)
319 int_lines
= r_int_output
.splitlines()
320 for line
in int_lines
:
321 line_items
= line
.lstrip().split(" ")
322 if "index" in line_items
[0]:
323 ifindex
= line_items
[1]
327 def generate_vrf_ifindex_oid(vrf
, ifindex
):
329 intoid
= snmp_uint32_to_oid(int(ifindex
))
330 vrfoid
= snmp_str_to_oid(vrf
)
331 oid
= "{}.{}".format(vrfoid
, intoid
)
336 def generate_vrf_index_type_oid(vrf
, index
, type):
337 vrfoid
= snmp_str_to_oid(vrf
)
338 intoid
= snmp_uint32_to_oid(int(index
))
339 oid
= "{}.{}.{}".format(vrfoid
, intoid
, type)
345 "mplsL3VpnIfVpnClassification": ["enterprise(2)", "enterprise(2)", "enterprise(2)"],
346 "mplsL3VpnIfConfStorageType": ["volatile(2)", "volatile(2)", "volatile(2)"],
347 "mplsL3VpnIfConfRowStatus": ["active(1)", "active(1)", "active(1)"],
351 def get_timetick_val(time
):
352 return int(time
.split(" ")[0].lstrip("(").rstrip(")"))
355 def test_r1_mplsvpn_IfTable():
356 "mplsL3VpnIf table values"
359 r1
= tgen
.gears
["r1"]
361 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
364 eth3_ifindex
= router_interface_get_ifindex(r1
, "eth3")
365 eth4_ifindex
= router_interface_get_ifindex(r1
, "eth4")
366 eth5_ifindex
= router_interface_get_ifindex(r1
, "eth5")
368 # get ifindex and make sure the oid is correct
372 oids
.append(generate_vrf_ifindex_oid("VRF-a", eth3_ifindex
))
373 oids
.append(generate_vrf_ifindex_oid("VRF-a", eth4_ifindex
))
374 oids
.append(generate_vrf_ifindex_oid("VRF-b", eth5_ifindex
))
376 for item
in iftable_up_test
.keys():
377 assertmsg
= "{} should be {} oids {} full dict {}:".format(
378 item
, iftable_up_test
[item
], oids
, r1_snmp
.walk(item
)
380 assert r1_snmp
.test_oid_walk(item
, iftable_up_test
[item
], oids
), assertmsg
382 # an inactive vrf should not affect these values
383 r1
.cmd("ip link set r1-eth5 down")
385 for item
in iftable_up_test
.keys():
386 assertmsg
= "{} should be {} oids {} full dict {}:".format(
387 item
, iftable_up_test
[item
], oids
, r1_snmp
.walk(item
)
389 assert r1_snmp
.test_oid_walk(item
, iftable_up_test
[item
], oids
), assertmsg
391 r1
.cmd("ip link set r1-eth5 up")
395 "mplsL3VpnVrfDescription": ["VRF-a", "VRF-b"],
396 "mplsL3VpnVrfRD": ['"10:1"', '"10:2"'],
397 "mplsL3VpnVrfOperStatus": ["up(1)", "up(1)"],
398 "mplsL3VpnVrfActiveInterfaces": ["2", "1"],
399 "mplsL3VpnVrfAssociatedInterfaces": ["2", "1"],
400 "mplsL3VpnVrfConfMidRteThresh": ["0", "0"],
401 "mplsL3VpnVrfConfHighRteThresh": ["0", "0"],
402 "mplsL3VpnVrfConfMaxRoutes": ["0", "0"],
403 "mplsL3VpnVrfConfRowStatus": ["active(1)", "active(1)"],
404 "mplsL3VpnVrfConfAdminStatus": ["up(1)", "up(1)"],
405 "mplsL3VpnVrfConfStorageType": ["volatile(2)", "volatile(2)"],
409 def test_r1_mplsvpn_VrfTable():
412 r1
= tgen
.gears
["r1"]
414 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
420 oids
.append(snmp_str_to_oid("VRF-a"))
421 oids
.append(snmp_str_to_oid("VRF-b"))
424 for item
in vrftable_test
.keys():
425 assertmsg
= "{} should be {} oids {} full dict {}:".format(
426 item
, vrftable_test
[item
], oids
, r1_snmp
.walk(item
)
428 assert r1_snmp
.test_oid_walk(item
, vrftable_test
[item
], oids
), assertmsg
430 # check timetick set and stable
431 ts_a
= r1_snmp
.get("mplsL3VpnVrfCreationTime.{}".format(snmp_str_to_oid("VRF-a")))
432 ts_b
= r1_snmp
.get("mplsL3VpnVrfCreationTime.{}".format(snmp_str_to_oid("VRF-b")))
433 ts_val_a1
= get_timetick_val(ts_a
)
434 ts_val_b1
= get_timetick_val(ts_b
)
435 ts_a
= r1_snmp
.get("mplsL3VpnVrfCreationTime.{}".format(snmp_str_to_oid("VRF-a")))
436 ts_b
= r1_snmp
.get("mplsL3VpnVrfCreationTime.{}".format(snmp_str_to_oid("VRF-b")))
437 ts_val_a2
= get_timetick_val(ts_a
)
438 ts_val_b2
= get_timetick_val(ts_b
)
440 assertmsg
= "timestamp values for VRF-a do not match {} {}".format(
443 assert ts_val_a1
== ts_val_a2
, assertmsg
444 assertmsg
= "timestamp values for VRF-b do not match {} {}".format(
447 assert ts_val_b1
== ts_val_b2
, assertmsg
449 # take Last changed time, fiddle with active interfaces, ensure
450 # time changes and active interfaces change
451 ts_last
= r1_snmp
.get(
452 "mplsL3VpnVrfConfLastChanged.{}".format(snmp_str_to_oid("VRF-a"))
454 ts_val_last_1
= get_timetick_val(ts_last
)
455 r1
.vtysh_cmd("conf t\ninterface r1-eth3\nshutdown")
456 active_int
= r1_snmp
.get(
457 "mplsL3VpnVrfActiveInterfaces.{}".format(snmp_str_to_oid("VRF-a"))
459 assertmsg
= "mplsL3VpnVrfActiveInterfaces incorrect should be 1 value {}".format(
462 assert active_int
== "1", assertmsg
464 ts_last
= r1_snmp
.get(
465 "mplsL3VpnVrfConfLastChanged.{}".format(snmp_str_to_oid("VRF-a"))
467 ts_val_last_2
= get_timetick_val(ts_last
)
468 assertmsg
= "mplsL3VpnVrfConfLastChanged does not update on interface change"
469 assert ts_val_last_2
> ts_val_last_1
, assertmsg
470 r1
.vtysh_cmd("conf t\ninterface r1-eth3\nno shutdown")
472 # take Last changed time, fiddle with associated interfaces, ensure
473 # time changes and active interfaces change
474 ts_last
= r1_snmp
.get(
475 "mplsL3VpnVrfConfLastChanged.{}".format(snmp_str_to_oid("VRF-a"))
477 ts_val_last_1
= get_timetick_val(ts_last
)
478 r1
.cmd("ip link set r1-eth6 master VRF-a")
479 r1
.cmd("ip link set r1-eth6 up")
481 associated_int
= r1_snmp
.get(
482 "mplsL3VpnVrfAssociatedInterfaces.{}".format(snmp_str_to_oid("VRF-a"))
485 "mplsL3VpnVrfAssociatedInterfaces incorrect should be 3 value {}".format(
490 assert associated_int
== "3", assertmsg
491 ts_last
= r1_snmp
.get(
492 "mplsL3VpnVrfConfLastChanged.{}".format(snmp_str_to_oid("VRF-a"))
494 ts_val_last_2
= get_timetick_val(ts_last
)
495 assertmsg
= "mplsL3VpnVrfConfLastChanged does not update on interface change"
496 assert ts_val_last_2
> ts_val_last_1
, assertmsg
497 r1
.cmd("ip link del r1-eth6 master VRF-a")
498 r1
.cmd("ip link set r1-eth6 down")
502 "mplsL3VpnVrfRT": ['"1:1"', '"1:2"'],
503 "mplsL3VpnVrfRTDescr": ["RT both for VRF VRF-a", "RT both for VRF VRF-b"],
504 "mplsL3VpnVrfRTRowStatus": ["active(1)", "active(1)"],
505 "mplsL3VpnVrfRTStorageType": ["volatile(2)", "volatile(2)"],
509 def test_r1_mplsvpn_VrfRT_table():
512 r1
= tgen
.gears
["r1"]
514 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
517 oids
.append(generate_vrf_index_type_oid("VRF-a", 1, 3))
518 oids
.append(generate_vrf_index_type_oid("VRF-b", 1, 3))
521 for item
in rt_table_test
.keys():
523 assertmsg
= "{} should be {} oids {} full dict {}:".format(
524 item
, rt_table_test
[item
], oids
, r1_snmp
.walk(item
)
526 assert r1_snmp
.test_oid_walk(item
, rt_table_test
[item
], oids
), assertmsg
529 def test_r1_mplsvpn_perf_table():
532 r1
= tgen
.gears
["r1"]
534 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
537 oid_a
= snmp_str_to_oid("VRF-a")
538 oid_b
= snmp_str_to_oid("VRF-b")
540 # poll for 10 seconds for routes to appear
544 if r1_snmp
.test_oid_walk(
545 "mplsL3VpnVrfPerfCurrNumRoutes.{}".format(oid_a
), ["7"]
552 assertmsg
= "mplsL3VpnVrfPerfCurrNumRoutes shouold be 7 got {}".format(
553 r1_snmp
.get("mplsL3VpnVrfPerfCurrNumRoutes.{}".format(oid_a
))
555 assert passed
, assertmsg
556 curr_a
= int(r1_snmp
.get("mplsL3VpnVrfPerfCurrNumRoutes.{}".format(oid_a
)))
557 del_a
= int(r1_snmp
.get("mplsL3VpnVrfPerfRoutesDeleted.{}".format(oid_a
)))
558 add_a
= int(r1_snmp
.get("mplsL3VpnVrfPerfRoutesAdded.{}".format(oid_a
)))
560 assertmsg
= "FAIL curr{} does not equal added{} - deleted {}".format(
563 assert curr_a
== (add_a
- del_a
), assertmsg
564 curr_b
= int(r1_snmp
.get("mplsL3VpnVrfPerfCurrNumRoutes.{}".format(oid_b
)))
565 del_b
= int(r1_snmp
.get("mplsL3VpnVrfPerfRoutesDeleted.{}".format(oid_b
)))
566 add_b
= int(r1_snmp
.get("mplsL3VpnVrfPerfRoutesAdded.{}".format(oid_b
)))
567 assertmsg
= "FAIL curr{} does not equal added{} - deleted {}".format(
570 assert curr_b
== (add_b
- del_b
), assertmsg
574 "mplsL3VpnVrfRteInetCidrDestType": [
583 "mplsL3VpnVrfRteInetCidrDest": [
592 "mplsL3VpnVrfRteInetCidrPfxLen": ["32", "32", "24", "24", "24", "24", "24"],
593 "mplsL3VpnVrfRteInetCidrNHopType": [
602 "mplsL3VpnVrfRteInetCidrNextHop": [
611 "mplsL3VpnVrfRteInetCidrType": [
620 "mplsL3VpnVrfRteInetCidrProto": [
629 "mplsL3VpnVrfRteInetCidrNextHopAS": [
638 "mplsL3VpnVrfRteInetCidrMetric1": ["0", "0", "20", "0", "0", "0", "0"],
639 "mplsL3VpnVrfRteInetCidrMetric2": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
640 "mplsL3VpnVrfRteInetCidrMetric3": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
641 "mplsL3VpnVrfRteInetCidrMetric4": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
642 "mplsL3VpnVrfRteInetCidrMetric5": ["-1", "-1", "-1", "-1", "-1", "-1", "-1"],
643 "mplsL3VpnVrfRteXCPointer": ["00", "00", "00", "00", "00", "00", "00"],
644 "mplsL3VpnVrfRteInetCidrStatus": [
656 def test_r1_mplsvpn_rte_table():
659 r1
= tgen
.gears
["r1"]
661 r1_snmp
= SnmpTester(r1
, "10.1.1.1", "public", "2c")
664 oid_1
= snmp_rte_oid("VRF-a", 1, "10.5.5.5", 32, 0, 1, "192.168.100.10")
665 oid_2
= snmp_rte_oid("VRF-a", 1, "10.7.7.7", 32, 0, 1, "192.168.200.10")
666 oid_3
= snmp_rte_oid("VRF-a", 1, "192.168.34.0", 24, 0, 1, "10.4.4.4")
667 oid_4
= snmp_rte_oid("VRF-a", 1, "192.168.100.0", 24, 1, 1, "192.168.100.10")
668 oid_4_a
= snmp_rte_oid("VRF-a", 1, "192.168.100.0", 24, 0, 1, "192.168.100.10")
669 oid_5
= snmp_rte_oid("VRF-a", 1, "192.168.100.0", 24, 0, 0)
670 oid_5_a
= snmp_rte_oid("VRF-a", 1, "192.168.100.0", 24, 1, 0)
671 oid_6
= snmp_rte_oid("VRF-a", 1, "192.168.200.0", 24, 1, 1, "192.168.200.10")
672 oid_6_a
= snmp_rte_oid("VRF-a", 1, "192.168.200.0", 24, 0, 1, "192.168.200.10")
673 oid_7
= snmp_rte_oid("VRF-a", 1, "192.168.200.0", 24, 0, 0)
674 oid_7_a
= snmp_rte_oid("VRF-a", 1, "192.168.200.0", 24, 1, 0)
677 [oid_1
, oid_2
, oid_3
, oid_4
, oid_5
, oid_6
, oid_7
],
678 [oid_1
, oid_2
, oid_3
, oid_4_a
, oid_5_a
, oid_6
, oid_7
],
679 [oid_1
, oid_2
, oid_3
, oid_4
, oid_5
, oid_6_a
, oid_7_a
],
680 [oid_1
, oid_2
, oid_3
, oid_4_a
, oid_5_a
, oid_6_a
, oid_7_a
],
681 [oid_1
, oid_2
, oid_3
, oid_4
, oid_5
, oid_6
, oid_7
],
682 [oid_1
, oid_2
, oid_3
, oid_4_a
, oid_5_a
, oid_6
, oid_7
],
683 [oid_1
, oid_2
, oid_3
, oid_4
, oid_5
, oid_6_a
, oid_7_a
],
684 [oid_1
, oid_2
, oid_3
, oid_4_a
, oid_5_a
, oid_6_a
, oid_7_a
],
690 for oid_list
in oid_lists
:
692 for item
in rte_table_test
.keys():
694 assertmsg
= "{} should be {} oids {} full dict {}:".format(
695 item
, rte_table_test
[item
], oid_list
, r1_snmp
.walk(item
)
697 if not r1_snmp
.test_oid_walk(item
, rte_table_test
[item
], oid_list
):
701 "{} should be {} oids {} full dict {}:".format(
702 item
, rte_table_test
[item
], oid_list
, r1_snmp
.walk(item
)
707 # generate ifindex row grabbing ifindices from vtysh
710 router_interface_get_ifindex(r1
, "eth3"),
711 router_interface_get_ifindex(r1
, "eth4"),
712 router_interface_get_ifindex(r1
, "eth2"),
713 router_interface_get_ifindex(r1
, "eth3"),
715 router_interface_get_ifindex(r1
, "eth4"),
718 if not r1_snmp
.test_oid_walk(
719 "mplsL3VpnVrfRteInetCidrIfIndex", ifindex_row
, oid_list
723 print("passed {}".format(passed
))
724 assert passed
, assertmsg
727 def test_memory_leak():
728 "Run the memory leak test and report results."
730 if not tgen
.is_memleak_enabled():
731 pytest
.skip("Memory leak test/report is disabled")
733 tgen
.report_memory_leaks()
736 if __name__
== "__main__":
737 args
= ["-s"] + sys
.argv
[1:]
738 sys
.exit(pytest
.main(args
))