4 # test_zebra_netlink.py
6 # Copyright (c) 2020 by
8 # Permission to use, copy, modify, and/or distribute this software
9 # for any purpose with or without fee is hereby granted, provided
10 # that the above copyright notice and this permission notice appear
13 # THE SOFTWARE IS PROVIDED "AS IS" AND NETDEF DISCLAIMS ALL WARRANTIES
14 # WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
15 # MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NETDEF BE LIABLE FOR
16 # ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY
17 # DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS,
18 # WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS
19 # ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE
24 test_zebra_netlink.py: Test some basic interactions with kernel using Netlink
34 from functools
import partial
36 # Save the Current Working Directory to find configuration files.
37 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
38 sys
.path
.append(os
.path
.join(CWD
, "../"))
40 # pylint: disable=C0413
41 # Import topogen and topotest helpers
42 from lib
import topotest
43 from lib
.topogen
import Topogen
, TopoRouter
, get_topogen
44 from lib
.topolog
import logger
45 from lib
.common_config
import shutdown_bringup_interface
47 # Required to instantiate the topology builder class.
48 from mininet
.topo
import Topo
50 pytestmark
= [pytest
.mark
.sharpd
]
53 #####################################################
55 ## Network Topology Definition
57 #####################################################
60 class ZebraTopo(Topo
):
61 "Test topology builder"
63 def build(self
, *_args
, **_opts
):
65 tgen
= get_topogen(self
)
69 # Create a empty network for router 1
70 switch
= tgen
.add_switch("s1")
71 switch
.add_link(tgen
.gears
["r1"])
74 #####################################################
78 #####################################################
81 def setup_module(mod
):
82 "Sets up the pytest environment"
83 tgen
= Topogen(ZebraTopo
, mod
.__name
__)
86 router_list
= tgen
.routers()
87 for rname
, router
in router_list
.items():
89 TopoRouter
.RD_ZEBRA
, os
.path
.join(CWD
, "{}/zebra.conf".format(rname
))
93 TopoRouter
.RD_SHARP
, os
.path
.join(CWD
, "{}/sharpd.conf".format(rname
))
96 # Initialize all routers.
100 def teardown_module(_mod
):
101 "Teardown the pytest environment"
104 # This function tears down the whole topology.
108 def test_zebra_netlink_batching():
109 "Test the situation where dataplane fills netlink send buffer entirely."
111 "Test the situation where dataplane fills netlink send buffer entirely."
114 if tgen
.routers_have_failure():
115 pytest
.skip("skipped because of previous test failure")
116 r1
= tgen
.gears
["r1"]
118 # Reduce the size of the buffer to hit the limit.
119 r1
.vtysh_cmd("conf t\nzebra kernel netlink batch-tx-buf 256 256")
121 r1
.vtysh_cmd("sharp install routes 2.1.3.7 nexthop 192.168.1.1 100")
122 json_file
= "{}/r1/v4_route.json".format(CWD
)
123 expected
= json
.loads(open(json_file
).read())
125 topotest
.router_json_cmp
,
127 "show ip route json",
130 _
, result
= topotest
.run_and_expect(test_func
, None, count
=2, wait
=0.5)
131 assertmsg
= '"r1" JSON output mismatches'
132 assert result
is None, assertmsg
134 r1
.vtysh_cmd("sharp remove routes 2.1.3.7 100")
137 if __name__
== "__main__":
138 args
= ["-s"] + sys
.argv
[1:]
139 sys
.exit(pytest
.main(args
))