]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/grpc_basic/test_basic_grpc.py
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
3 # February 21 2022, Christian Hopps <chopps@labn.net>
5 # Copyright (c) 2022, LabN Consulting, L.L.C.
7 # This program is free software; you can redistribute it and/or
8 # modify it under the terms of the GNU General Public License
9 # as published by the Free Software Foundation; either version 2
10 # of the License, or (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License along
18 # with this program; see the file COPYING; if not, write to the Free Software
19 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 test_basic_grpc.py: Test Basic gRPC.
31 from lib
.common_config
import step
32 from lib
.micronet
import commander
33 from lib
.topogen
import Topogen
, TopoRouter
34 from lib
.topolog
import logger
36 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
46 # pytest.mark.mgmtd -- Need a new non-protocol marker
54 script_path
= os
.path
.realpath(os
.path
.join(CWD
, "../lib/grpc-query.py"))
57 commander
.cmd_raises([script_path
, "--check"])
60 "skipping; cannot create or import gRPC proto modules", allow_module_level
=True
64 @pytest.fixture(scope
="module")
66 "Setup/Teardown the environment and provide tgen argument to tests"
67 topodef
= {"s1": ("r1", "r2")}
68 tgen
= Topogen(topodef
, request
.module
.__name
__)
71 router_list
= tgen
.routers()
73 for rname
, router
in router_list
.items():
74 router
.load_config(TopoRouter
.RD_ZEBRA
, "zebra.conf", f
"-M grpc:{GRPCP_ZEBRA}")
75 router
.load_config(TopoRouter
.RD_STATIC
, None, f
"-M grpc:{GRPCP_STATICD}")
76 # router.load_config(TopoRouter.RD_BFD, None, f"-M grpc:{GRPCP_BFDD}")
77 # router.load_config(TopoRouter.RD_ISIS, None, f"-M grpc:{GRPCP_ISISD}")
78 # router.load_config(TopoRouter.RD_OSPF, None, f"-M grpc:{GRPCP_OSPFD}")
79 # router.load_config(TopoRouter.RD_PIM, None, f"-M grpc:{GRPCP_PIMD}")
84 logging
.info("Stopping all routers (no assert on error)")
88 # Let's not do this so we catch errors
89 # Fixture that executes before each test
90 @pytest.fixture(autouse
=True)
91 def skip_on_failure(tgen
):
92 if tgen
.routers_have_failure():
93 pytest
.skip("skipped because of previous test failure")
101 def run_grpc_client(r
, port
, commands
):
102 if not isinstance(commands
, str):
103 commands
= "\n".join(commands
) + "\n"
104 if not commands
.endswith("\n"):
106 return r
.cmd_raises([script_path
, f
"--port={port}"], stdin
=commands
)
109 def test_connectivity(tgen
):
110 r1
= tgen
.gears
["r1"]
111 output
= r1
.cmd_raises("ping -c1 192.168.1.2")
112 logging
.info("ping output: %s", output
)
115 def test_capabilities(tgen
):
116 r1
= tgen
.gears
["r1"]
117 output
= run_grpc_client(r1
, GRPCP_ZEBRA
, "GETCAP")
118 logging
.info("grpc output: %s", output
)
121 def test_get_config(tgen
):
123 r1
= tgen
.gears
["r1"]
125 step("'GET' inteface config 10 times, once per invocation")
127 for i
in range(0, nrepeat
):
128 output
= run_grpc_client(r1
, GRPCP_ZEBRA
, "GET,/frr-interface:lib")
129 logging
.info("[iteration %s]: grpc GET output: %s", i
, output
)
131 step(f
"'GET' YANG {nrepeat} times in one invocation")
132 commands
= ["GET,/frr-interface:lib" for _
in range(0, 10)]
133 output
= run_grpc_client(r1
, GRPCP_ZEBRA
, commands
)
134 logging
.info("grpc GET*{%d} output: %s", nrepeat
, output
)
137 def test_get_vrf_config(tgen
):
138 r1
= tgen
.gears
["r1"]
140 step("'GET' get VRF config")
142 output
= run_grpc_client(r1
, GRPCP_ZEBRA
, "GET,/frr-vrf:lib")
143 logging
.info("grpc GET /frr-vrf:lib output: %s", output
)
146 def test_shutdown_checks(tgen
):
147 # Start a process rnuning that will fetch bunches of data then shut the routers down
148 # and check for cores.
150 r1
= tgen
.gears
["r1"]
151 commands
= ["GET,/frr-interface:lib" for _
in range(0, nrepeat
)]
152 p
= r1
.popen([script_path
, f
"--port={GRPCP_ZEBRA}"] + commands
)
157 for r
in tgen
.routers().values():
159 r
.net
.checkRouterCores()
166 # Memory leak test template
167 # Not compatible with the shutdown check above
168 def _test_memory_leak(tgen
):
169 "Run the memory leak test and report results."
171 if not tgen
.is_memleak_enabled():
172 pytest
.skip("Memory leak test/report is disabled")
174 tgen
.report_memory_leaks()
177 if __name__
== "__main__":
178 args
= ["-s"] + sys
.argv
[1:]
179 sys
.exit(pytest
.main(args
))