]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/grpc_basic/test_basic_grpc.py
1 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
2 # SPDX-License-Identifier: GPL-2.0-or-later
4 # February 21 2022, Christian Hopps <chopps@labn.net>
6 # Copyright (c) 2022, LabN Consulting, L.L.C.
9 test_basic_grpc.py: Test Basic gRPC.
18 from lib
.common_config
import step
19 from lib
.micronet
import commander
20 from lib
.topogen
import Topogen
, TopoRouter
21 from lib
.topolog
import logger
23 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
33 # pytest.mark.mgmtd -- Need a new non-protocol marker
41 script_path
= os
.path
.realpath(os
.path
.join(CWD
, "../lib/grpc-query.py"))
44 commander
.cmd_raises([script_path
, "--check"])
47 "skipping; cannot create or import gRPC proto modules", allow_module_level
=True
51 @pytest.fixture(scope
="module")
53 "Setup/Teardown the environment and provide tgen argument to tests"
54 topodef
= {"s1": ("r1", "r2")}
55 tgen
= Topogen(topodef
, request
.module
.__name
__)
58 router_list
= tgen
.routers()
60 for rname
, router
in router_list
.items():
61 router
.load_config(TopoRouter
.RD_ZEBRA
, "zebra.conf", f
"-M grpc:{GRPCP_ZEBRA}")
62 router
.load_config(TopoRouter
.RD_STATIC
, None, f
"-M grpc:{GRPCP_STATICD}")
63 # router.load_config(TopoRouter.RD_BFD, None, f"-M grpc:{GRPCP_BFDD}")
64 # router.load_config(TopoRouter.RD_ISIS, None, f"-M grpc:{GRPCP_ISISD}")
65 # router.load_config(TopoRouter.RD_OSPF, None, f"-M grpc:{GRPCP_OSPFD}")
66 # router.load_config(TopoRouter.RD_PIM, None, f"-M grpc:{GRPCP_PIMD}")
71 logging
.info("Stopping all routers (no assert on error)")
75 # Let's not do this so we catch errors
76 # Fixture that executes before each test
77 @pytest.fixture(autouse
=True)
78 def skip_on_failure(tgen
):
79 if tgen
.routers_have_failure():
80 pytest
.skip("skipped because of previous test failure")
88 def run_grpc_client(r
, port
, commands
):
89 if not isinstance(commands
, str):
90 commands
= "\n".join(commands
) + "\n"
91 if not commands
.endswith("\n"):
93 return r
.cmd_raises([script_path
, f
"--port={port}"], stdin
=commands
)
96 def test_connectivity(tgen
):
98 output
= r1
.cmd_raises("ping -c1 192.168.1.2")
99 logging
.info("ping output: %s", output
)
102 def test_capabilities(tgen
):
103 r1
= tgen
.gears
["r1"]
104 output
= run_grpc_client(r1
, GRPCP_ZEBRA
, "GETCAP")
105 logging
.info("grpc output: %s", output
)
108 def test_get_config(tgen
):
110 r1
= tgen
.gears
["r1"]
112 step("'GET' interface config 10 times, once per invocation")
114 for i
in range(0, nrepeat
):
115 output
= run_grpc_client(r1
, GRPCP_ZEBRA
, "GET,/frr-interface:lib")
116 logging
.info("[iteration %s]: grpc GET output: %s", i
, output
)
118 step(f
"'GET' YANG {nrepeat} times in one invocation")
119 commands
= ["GET,/frr-interface:lib" for _
in range(0, 10)]
120 output
= run_grpc_client(r1
, GRPCP_ZEBRA
, commands
)
121 logging
.info("grpc GET*{%d} output: %s", nrepeat
, output
)
124 def test_get_vrf_config(tgen
):
125 r1
= tgen
.gears
["r1"]
127 step("'GET' get VRF config")
129 output
= run_grpc_client(r1
, GRPCP_ZEBRA
, "GET,/frr-vrf:lib")
130 logging
.info("grpc GET /frr-vrf:lib output: %s", output
)
133 def test_shutdown_checks(tgen
):
134 # Start a process rnuning that will fetch bunches of data then shut the routers down
135 # and check for cores.
137 r1
= tgen
.gears
["r1"]
138 commands
= ["GET,/frr-interface:lib" for _
in range(0, nrepeat
)]
139 p
= r1
.popen([script_path
, f
"--port={GRPCP_ZEBRA}"] + commands
)
144 for r
in tgen
.routers().values():
146 r
.net
.checkRouterCores()
153 # Memory leak test template
154 # Not compatible with the shutdown check above
155 def _test_memory_leak(tgen
):
156 "Run the memory leak test and report results."
158 if not tgen
.is_memleak_enabled():
159 pytest
.skip("Memory leak test/report is disabled")
161 tgen
.report_memory_leaks()
164 if __name__
== "__main__":
165 args
= ["-s"] + sys
.argv
[1:]
166 sys
.exit(pytest
.main(args
))