# Part of NetDEF Topology Tests
#
# Copyright (c) 2019 by
-# Network Device Education Foundation, Inc. ("NetDEF")
+# Donatas Abraitis <donatas.abraitis@gmail.com>
#
# Permission to use, copy, modify, and/or distribute this software
# for any purpose with or without fee is hereby granted, provided
import json
import time
import pytest
+import functools
CWD = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(CWD, '../'))
def build(self, *_args, **_opts):
tgen = get_topogen(self)
- for routern in range(1, 5):
+ for routern in range(1, 7):
tgen.add_router('r{}'.format(routern))
switch = tgen.add_switch('s1')
switch.add_link(tgen.gears['r3'])
switch.add_link(tgen.gears['r4'])
+ switch = tgen.add_switch('s3')
+ switch.add_link(tgen.gears['r5'])
+ switch.add_link(tgen.gears['r6'])
+
def setup_module(mod):
tgen = Topogen(TemplateTopo, mod.__name__)
tgen.start_topology()
tgen = get_topogen()
tgen.stop_topology()
-def test_bgp_remove_private_as():
+def test_ebgp_requires_policy():
tgen = get_topogen()
if tgen.routers_have_failure():
pytest.skip(tgen.errors)
def _bgp_converge(router):
- while True:
- cmd = "show ip bgp neighbor 192.168.255.1 json"
- output = json.loads(tgen.gears[router].vtysh_cmd(cmd))
- if output['192.168.255.1']['bgpState'] == 'Established':
- time.sleep(3)
- return True
-
- def _bgp_ebgp_requires_policy(router):
- cmd = "show ip bgp 172.16.255.254/32 json"
- output = json.loads(tgen.gears[router].vtysh_cmd(cmd))
- if 'prefix' in output:
- return True
- return False
-
- if _bgp_converge('r2'):
- assert _bgp_ebgp_requires_policy('r2') == True
-
- if _bgp_converge('r4'):
- assert _bgp_ebgp_requires_policy('r4') == False
+ output = json.loads(tgen.gears[router].vtysh_cmd("show ip bgp neighbor 192.168.255.1 json"))
+ expected = {
+ '192.168.255.1': {
+ 'bgpState': 'Established'
+ }
+ }
+ return topotest.json_cmp(output, expected)
+
+ def _bgp_has_routes(router):
+ output = json.loads(tgen.gears[router].vtysh_cmd("show ip bgp neighbor 192.168.255.1 routes json"))
+ expected = {
+ 'routes': {
+ '172.16.255.254/32': [
+ {
+ 'valid': True
+ }
+ ]
+ }
+ }
+ return topotest.json_cmp(output, expected)
+
+ test_func = functools.partial(_bgp_converge, 'r2')
+ success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert success is True, 'Failed bgp convergence (r2) in "{}"'.format(router)
+
+ test_func = functools.partial(_bgp_has_routes, 'r2')
+ success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert success is True, 'eBGP policy is not working (r2) in "{}"'.format(router)
+
+ test_func = functools.partial(_bgp_converge, 'r4')
+ success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert success is True, 'Failed bgp convergence (r4) in "{}"'.format(router)
+
+ test_func = functools.partial(_bgp_has_routes, 'r4')
+ success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert success is False, 'eBGP policy is not working (r4) in "{}"'.format(router)
+
+ test_func = functools.partial(_bgp_converge, 'r6')
+ success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert success is True, 'Failed bgp convergence (r6) in "{}"'.format(router)
+
+ test_func = functools.partial(_bgp_has_routes, 'r6')
+ success, result = topotest.run_and_expect(test_func, None, count=60, wait=0.5)
+ assert success is True, 'eBGP policy is not working (r6) in "{}"'.format(router)
if __name__ == '__main__':
args = ["-s"] + sys.argv[1:]