]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/pim_basic/test_pim.py
2 # SPDX-License-Identifier: ISC
7 # Copyright (c) 2018 Cumulus Networks, Inc.
19 from functools
import partial
21 pytestmark
= pytest
.mark
.pimd
23 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
24 sys
.path
.append(os
.path
.join(CWD
, "../"))
26 # pylint: disable=C0413
27 from lib
import topotest
28 from lib
.topogen
import Topogen
, TopoRouter
, get_topogen
29 from lib
.topolog
import logger
32 pytestmark
= [pytest
.mark
.pimd
]
38 for routern
in range(1, 4):
39 tgen
.add_router("r{}".format(routern
))
43 # rp ------ r1 -------- r2
50 # loopback network is 10.254.0.X/32
55 sw
= tgen
.add_switch("sw1")
56 sw
.add_link(tgen
.gears
["r1"])
57 sw
.add_link(tgen
.gears
["r2"])
62 sw
= tgen
.add_switch("sw2")
63 sw
.add_link(tgen
.gears
["r1"])
64 sw
.add_link(tgen
.gears
["rp"])
67 sw
= tgen
.add_switch("sw3")
68 sw
.add_link(tgen
.gears
["r1"])
69 sw
.add_link(tgen
.gears
["r3"])
72 def setup_module(mod
):
73 "Sets up the pytest environment"
74 tgen
= Topogen(build_topo
, mod
.__name
__)
77 # For all registered routers, load the zebra configuration file
78 for rname
, router
in tgen
.routers().items():
80 TopoRouter
.RD_ZEBRA
, os
.path
.join(CWD
, "{}/zebra.conf".format(rname
))
83 TopoRouter
.RD_PIM
, os
.path
.join(CWD
, "{}/pimd.conf".format(rname
))
86 TopoRouter
.RD_BGP
, os
.path
.join(CWD
, "{}/bgpd.conf".format(rname
))
89 # After loading the configurations, this function loads configured daemons.
94 def teardown_module(mod
):
95 "Teardown the pytest environment"
98 # This function tears down the whole topology.
102 def test_pim_rp_setup():
103 "Ensure basic routing has come up and the rp has an outgoing interface"
104 # Ensure rp and r1 establish pim neighbor ship and bgp has come up
105 # Finally ensure that the rp has an outgoing interface on r1
108 r1
= tgen
.gears
["r1"]
109 json_file
= "{}/{}/rp-info.json".format(CWD
, r1
.name
)
110 expected
= json
.loads(open(json_file
).read())
113 topotest
.router_json_cmp
, r1
, "show ip pim rp-info json", expected
115 _
, result
= topotest
.run_and_expect(test_func
, None, count
=30, wait
=1)
116 assertmsg
= '"{}" JSON output mismatches'.format(r1
.name
)
117 assert result
is None, assertmsg
121 def test_pim_send_mcast_stream():
122 "Establish a Multicast stream from r2 -> r1 and then ensure S,G is created as appropriate"
123 logger
.info("Establish a Mcast stream from r2->r1 and then ensure S,G created")
127 if tgen
.routers_have_failure():
128 pytest
.skip(tgen
.errors
)
130 rp
= tgen
.gears
["rp"]
131 r3
= tgen
.gears
["r3"]
132 r2
= tgen
.gears
["r2"]
133 r1
= tgen
.gears
["r1"]
135 # Let's establish a S,G stream from r2 -> r1
136 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
138 "{}/mcast-tx.py --ttl 5 --count 40 --interval 2 229.1.1.1 r2-eth0 > /tmp/bar".format(
144 "{}/mcast-tx.py --ttl 5 --count 40 --interval 2 229.1.1.1 r3-eth0 > /tmp/bar".format(
149 # Let's see that it shows up and we have established some basic state
150 out
= r1
.vtysh_cmd("show ip pim upstream json", isjson
=True)
155 "joinState": "NotJoined",
156 "regState": "RegPrune",
157 "inboundInterface": "r1-eth0",
163 topotest
.router_json_cmp
, r1
, "show ip pim upstream json", expected
165 _
, result
= topotest
.run_and_expect(test_func
, None, count
=40, wait
=1)
166 assert result
is None, "failed to converge pim"
170 def test_pim_rp_sees_stream():
171 "Ensure that the RP sees the stream and has acted accordingly"
174 rp
= tgen
.gears
["rp"]
175 json_file
= "{}/{}/upstream.json".format(CWD
, rp
.name
)
176 expected
= json
.loads(open(json_file
).read())
179 topotest
.router_json_cmp
, rp
, "show ip pim upstream json", expected
181 _
, result
= topotest
.run_and_expect(test_func
, None, count
=40, wait
=1)
182 assertmsg
= '"{}" JSON output mismatches'.format(rp
.name
)
183 assert result
is None, assertmsg
186 def test_pim_igmp_report():
187 "Send a igmp report from r2->r1 and ensure that the *,G state is created on r1"
188 logger
.info("Send a igmp report from r2-r1 and ensure *,G created")
192 if tgen
.routers_have_failure():
193 pytest
.skip(tgen
.errors
)
195 r2
= tgen
.gears
["r2"]
196 r1
= tgen
.gears
["r1"]
198 # Let's send a igmp report from r2->r1
199 cmd
= [os
.path
.join(CWD
, "mcast-rx.py"), "229.1.1.2", "r2-eth0"]
206 "joinState": "Joined",
207 "regState": "RegNoInfo",
213 topotest
.router_json_cmp
, r1
, "show ip pim upstream json", expected
215 _
, result
= topotest
.run_and_expect(test_func
, None, count
=40, wait
=1)
216 assertmsg
= '"{}" JSON output mismatches'.format(r1
.name
)
217 assert result
is None, assertmsg
224 def test_memory_leak():
225 "Run the memory leak test and report results."
227 if not tgen
.is_memleak_enabled():
228 pytest
.skip("Memory leak test/report is disabled")
230 tgen
.report_memory_leaks()
233 if __name__
== "__main__":
234 args
= ["-s"] + sys
.argv
[1:]
235 sys
.exit(pytest
.main(args
))