4 # test_zebra_netlink.py
6 # Copyright (c) 2020 by
8 # Permission to use, copy, modify, and/or distribute this software
9 # for any purpose with or without fee is hereby granted, provided
10 # that the above copyright notice and this permission notice appear
13 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
14 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
16 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
17 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
18 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
19 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
24 test_zebra_netlink.py: Test some basic interactions with kernel using Netlink
34 from functools
import partial
36 # Save the Current Working Directory to find configuration files.
37 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
38 sys
.path
.append(os
.path
.join(CWD
, "../"))
40 # pylint: disable=C0413
41 # Import topogen and topotest helpers
42 from lib
import topotest
43 from lib
.topogen
import Topogen
, TopoRouter
, get_topogen
44 from lib
.topolog
import logger
45 from lib
.common_config
import shutdown_bringup_interface
47 # Required to instantiate the topology builder class.
48 from mininet
.topo
import Topo
50 #####################################################
52 ## Network Topology Definition
54 #####################################################
57 class ZebraTopo(Topo
):
58 "Test topology builder"
60 def build(self
, *_args
, **_opts
):
62 tgen
= get_topogen(self
)
66 # Create a empty network for router 1
67 switch
= tgen
.add_switch("s1")
68 switch
.add_link(tgen
.gears
["r1"])
71 #####################################################
75 #####################################################
78 def setup_module(mod
):
79 "Sets up the pytest environment"
80 tgen
= Topogen(ZebraTopo
, mod
.__name
__)
83 router_list
= tgen
.routers()
84 for rname
, router
in router_list
.iteritems():
86 TopoRouter
.RD_ZEBRA
, os
.path
.join(CWD
, "{}/zebra.conf".format(rname
))
90 TopoRouter
.RD_SHARP
, os
.path
.join(CWD
, "{}/sharpd.conf".format(rname
))
93 # Initialize all routers.
97 def teardown_module(_mod
):
98 "Teardown the pytest environment"
101 # This function tears down the whole topology.
105 def test_zebra_netlink_batching():
106 "Test the situation where dataplane fills netlink send buffer entirely."
108 "Test the situation where dataplane fills netlink send buffer entirely."
111 if tgen
.routers_have_failure():
112 ptyest
.skip("skipped because of preview test failure")
113 r1
= tgen
.gears
["r1"]
115 # Reduce the size of the buffer to hit the limit.
116 r1
.vtysh_cmd("conf t\nzebra kernel netlink batch-tx-buf 256 256")
118 r1
.vtysh_cmd("sharp install routes 2.1.3.7 nexthop 192.168.1.1 100")
119 json_file
= "{}/r1/v4_route.json".format(CWD
)
120 expected
= json
.loads(open(json_file
).read())
121 test_func
= partial(topotest
.router_json_cmp
, r1
, "show ip route json", expected
,)
122 _
, result
= topotest
.run_and_expect(test_func
, None, count
=2, wait
=0.5)
123 assertmsg
= '"r1" JSON output mismatches'
124 assert result
is None, assertmsg
126 r1
.vtysh_cmd("sharp remove routes 2.1.3.7 100")
129 if __name__
== "__main__":
130 args
= ["-s"] + sys
.argv
[1:]
131 sys
.exit(pytest
.main(args
))