]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/evpn_pim_1/test_evpn_pim_topo1.py
2 # SPDX-License-Identifier: ISC
5 # test_evpn-pim_topo1.py
7 # Copyright (c) 2017 by
8 # Cumulus Networks, Inc.
13 test_evpn_pim_topo1.py: Testing evpn-pim
21 from functools
import partial
23 pytestmark
= [pytest
.mark
.pimd
]
25 # Save the Current Working Directory to find configuration files.
26 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
27 sys
.path
.append(os
.path
.join(CWD
, "../"))
29 # pylint: disable=C0413
30 # Import topogen and topotest helpers
31 from lib
import topotest
32 from lib
.topogen
import Topogen
, TopoRouter
, get_topogen
33 from lib
.topolog
import logger
35 # Required to instantiate the topology builder class.
37 pytestmark
= [pytest
.mark
.bgpd
, pytest
.mark
.bgpd
]
40 #####################################################
42 ## Network Topology Definition
44 #####################################################
48 tgen
.add_router("spine")
49 tgen
.add_router("leaf1")
50 tgen
.add_router("leaf2")
51 tgen
.add_router("host1")
52 tgen
.add_router("host2")
55 # First switch is for a dummy interface (for local network)
56 # spine-eth0 is connected to leaf1-eth0
57 switch
= tgen
.add_switch("sw1")
58 switch
.add_link(tgen
.gears
["spine"])
59 switch
.add_link(tgen
.gears
["leaf1"])
61 # spine-eth1 is connected to leaf2-eth0
62 switch
= tgen
.add_switch("sw2")
63 switch
.add_link(tgen
.gears
["spine"])
64 switch
.add_link(tgen
.gears
["leaf2"])
66 # leaf1-eth1 is connected to host1-eth0
67 switch
= tgen
.add_switch("sw3")
68 switch
.add_link(tgen
.gears
["leaf1"])
69 switch
.add_link(tgen
.gears
["host1"])
71 # leaf2-eth1 is connected to host2-eth0
72 switch
= tgen
.add_switch("sw4")
73 switch
.add_link(tgen
.gears
["leaf2"])
74 switch
.add_link(tgen
.gears
["host2"])
77 #####################################################
81 #####################################################
84 def setup_module(module
):
86 tgen
= Topogen(build_topo
, module
.__name
__)
89 leaf1
= tgen
.gears
["leaf1"]
90 leaf2
= tgen
.gears
["leaf2"]
92 leaf1
.run("brctl addbr brleaf1")
93 leaf2
.run("brctl addbr brleaf2")
94 leaf1
.run("ip link set dev brleaf1 up")
95 leaf2
.run("ip link set dev brleaf2 up")
97 "ip link add vxlan0 type vxlan id 42 group 239.1.1.1 dev leaf1-eth1 dstport 4789"
100 "ip link add vxlan0 type vxlan id 42 group 239.1.1.1 dev leaf2-eth1 dstport 4789"
102 leaf1
.run("brctl addif brleaf1 vxlan0")
103 leaf2
.run("brctl addif brleaf2 vxlan0")
104 leaf1
.run("ip link set up dev vxlan0")
105 leaf2
.run("ip link set up dev vxlan0")
107 # This is a sample of configuration loading.
108 router_list
= tgen
.routers()
109 for rname
, router
in router_list
.items():
111 TopoRouter
.RD_ZEBRA
, os
.path
.join(CWD
, "{}/zebra.conf".format(rname
))
114 TopoRouter
.RD_BGP
, os
.path
.join(CWD
, "{}/bgpd.conf".format(rname
))
117 TopoRouter
.RD_PIM
, os
.path
.join(CWD
, "{}/pimd.conf".format(rname
))
123 def teardown_module(_mod
):
124 "Teardown the pytest environment"
127 # This function tears down the whole topology.
131 def test_converge_protocols():
132 "Wait for protocol convergence"
135 # Don't run this test if we have any failure.
136 if tgen
.routers_have_failure():
137 pytest
.skip(tgen
.errors
)
139 spine
= tgen
.gears
["spine"]
140 json_file
= "{}/{}/bgp.summ.json".format(CWD
, spine
.name
)
141 expected
= json
.loads(open(json_file
).read())
144 topotest
.router_json_cmp
, spine
, "show bgp ipv4 uni summ json", expected
146 _
, result
= topotest
.run_and_expect(test_func
, None, count
=125, wait
=1)
147 assertmsg
= '"{}" JSON output mismatches'.format(spine
.name
)
148 assert result
is None, assertmsg
152 def test_multicast_groups_on_rp():
153 "Ensure the multicast groups show up on the spine"
154 # This test implicitly tests the auto mcast groups
155 # of the created vlans and then the auto-joins that
156 # pim will do to the RP( spine )
160 if tgen
.routers_have_failure():
161 pytest
.skip(tgen
.errors
)
163 spine
= tgen
.gears
["spine"]
164 json_file
= "{}/{}/join-info.json".format(CWD
, spine
.name
)
165 expected
= json
.loads(open(json_file
).read())
168 topotest
.router_json_cmp
, spine
, "show ip pim join json", expected
170 _
, result
= topotest
.run_and_expect(test_func
, None, count
=30, wait
=1)
171 assertmsg
= '"{}" JSON output mismatches'.format(spine
.name
)
172 assert result
is None, assertmsg
176 def test_shutdown_check_stderr():
177 if os
.environ
.get("TOPOTESTS_CHECK_STDERR") is None:
178 pytest
.skip("Skipping test for Stderr output and memory leaks")
181 # Don't run this test if we have any failure.
182 if tgen
.routers_have_failure():
183 pytest
.skip(tgen
.errors
)
185 logger
.info("Verifying unexpected STDERR output from daemons")
187 router_list
= tgen
.routers().values()
188 for router
in router_list
:
191 log
= tgen
.net
[router
.name
].getStdErr("pimd")
193 logger
.error("PIMd StdErr Log:" + log
)
194 log
= tgen
.net
[router
.name
].getStdErr("bgpd")
196 logger
.error("BGPd StdErr Log:" + log
)
197 log
= tgen
.net
[router
.name
].getStdErr("zebra")
199 logger
.error("Zebra StdErr Log:" + log
)
202 if __name__
== "__main__":
203 args
= ["-s"] + sys
.argv
[1:]
204 sys
.exit(pytest
.main(args
))