import os
import sys
import pytest
+import json
+from functools import partial
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, '../'))
for routern in range(1, 3):
tgen.add_router('r{}'.format(routern))
+ tgen.add_router('rp')
+
+ # r1 -> .1
+ # r2 -> .2
+ # rp -> .3
+ # loopback network is 10.254.0.X/32
+ #
# r1 <- sw1 -> r2
+ # r1-eth0 <-> r2-eth0
+ # 10.0.20.0/24
sw = tgen.add_switch('sw1')
sw.add_link(tgen.gears['r1'])
sw.add_link(tgen.gears['r2'])
+ # r1 <- sw2 -> rp
+ # r1-eth1 <-> rp-eth0
+ # 10.0.30.0/24
+ sw = tgen.add_switch('sw2')
+ sw.add_link(tgen.gears['r1'])
+ sw.add_link(tgen.gears['rp'])
+
def setup_module(mod):
"Sets up the pytest environment"
TopoRouter.RD_PIM,
os.path.join(CWD, '{}/pimd.conf'.format(rname))
)
+ router.load_config(
+ TopoRouter.RD_BGP,
+ os.path.join(CWD, '{}/bgpd.conf'.format(rname))
+ )
# After loading the configurations, this function loads configured daemons.
tgen.start_router()
+ #tgen.mininet_cli()
def teardown_module(mod):
# This function tears down the whole topology.
tgen.stop_topology()
+def test_pim_rp_setup():
+ "Ensure basic routing has come up and the rp has an outgoing interface"
+ #Ensure rp and r1 establish pim neighbor ship and bgp has come up
+ #Finally ensure that the rp has an outgoing interface on r1
+ tgen = get_topogen()
+
+ r1 = tgen.gears['r1']
+ json_file = '{}/{}/rp-info.json'.format(CWD, r1.name)
+ expected = json.loads(open(json_file).read())
+
+ test_func = partial(topotest.router_json_cmp,
+ r1, 'show ip pim rp-info json', expected)
+ _, result = topotest.run_and_expect(test_func, None, count=15, wait=5)
+ assertmsg = '"{}" JSON output mismatches'.format(r1.name)
+ assert result is None, assertmsg
+ #tgen.mininet_cli()
def test_pim_send_mcast_stream():
"Establish a Multicast stream from r2 -> r1 and then ensure S,G is created as appropriate"
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
+ rp = tgen.gears['rp']
r2 = tgen.gears['r2']
r1 = tgen.gears['r1']
}
assert topotest.json_cmp(out, expected) is None, 'failed to converge pim'
+ #tgen.mininet_cli()
+
+def test_pim_rp_sees_stream():
+ "Ensure that the RP sees the stream and has acted accordingly"
+ tgen = get_topogen()
+
+ rp = tgen.gears['rp']
+ json_file = '{}/{}/upstream.json'.format(CWD, rp.name)
+ expected = json.loads(open(json_file).read())
+ test_func = partial(topotest.router_json_cmp,
+ rp, 'show ip pim upstream json', expected)
+ _, result = topotest.run_and_expect(test_func, None, count=20, wait=.5)
+ assertmsg = '"{}" JSON output mismatches'.format(rp.name)
+ assert result is None, assertmsg
def test_pim_igmp_report():
"Send a igmp report from r2->r1 and ensure that the *,G state is created on r1"