]>
git.proxmox.com Git - mirror_frr.git/blob - tests/topotests/lib/grpc-query.py
2 # -*- coding: utf-8 eval: (blacken-mode 1) -*-
3 # SPDX-License-Identifier: MIT
5 # February 22 2022, Christian Hopps <chopps@labn.net>
7 # Copyright (c) 2022, LabN Consulting, L.L.C.
16 CWD
= os
.path
.dirname(os
.path
.realpath(__file__
))
18 # This is painful but works if you have installed grpc and grpc_tools would be *way*
19 # better if we actually built and installed these but ... python packaging.
24 from micronet
import commander
26 commander
.cmd_raises(f
"cp {CWD}/../../../grpc/frr-northbound.proto .")
28 f
"python -m grpc_tools.protoc --python_out=. --grpc_python_out=. -I . frr-northbound.proto"
30 except Exception as error
:
31 logging
.error("can't create proto definition modules %s", error
)
36 import frr_northbound_pb2
37 import frr_northbound_pb2_grpc
39 # Would be nice if compiling the modules internally from the source worked
40 # # import grpc_tools.protoc
41 # # proto_include = pkg_resources.resource_filename("grpc_tools", "_proto")
42 # from grpc_tools.protoc import _proto_file_to_module_name, _protos_and_services
44 # frr_northbound_pb2, frr_northbound_pb2_grpc = _protos_and_services(
45 # "frr_northbound.proto"
49 except Exception as error
:
50 logging
.error("can't import proto definition modules %s", error
)
55 def __init__(self
, server
, port
):
56 self
.channel
= grpc
.insecure_channel("{}:{}".format(server
, port
))
57 self
.stub
= frr_northbound_pb2_grpc
.NorthboundStub(self
.channel
)
59 def get_capabilities(self
):
60 request
= frr_northbound_pb2
.GetCapabilitiesRequest()
63 response
= self
.stub
.GetCapabilities(request
)
64 except Exception as error
:
65 logging
.error("Got exception from stub: %s", error
)
67 logging
.debug("GRPC Capabilities: %s", response
)
71 request
= frr_northbound_pb2
.GetRequest()
72 request
.path
.append(xpath
)
73 request
.type = frr_northbound_pb2
.GetRequest
.ALL
74 request
.encoding
= frr_northbound_pb2
.XML
76 for r
in self
.stub
.Get(request
):
77 logging
.info('GRPC Get path: "%s" value: %s', request
.path
, r
)
78 xml
+= str(r
.data
.data
)
82 def next_action(action_list
=None):
83 "Get next action from list or STDIN"
85 for action
in action_list
:
99 parser
= argparse
.ArgumentParser(description
="gRPC Client")
101 "-s", "--server", default
="localhost", help="gRPC Server Address"
104 "-p", "--port", type=int, default
=50051, help="gRPC Server TCP Port"
106 parser
.add_argument("-v", "--verbose", action
="store_true", help="be verbose")
107 parser
.add_argument("--check", action
="store_true", help="check runable")
108 parser
.add_argument("actions", nargs
="*", help="GETCAP|GET,xpath")
109 args
= parser
.parse_args(*args
)
111 level
= logging
.DEBUG
if args
.verbose
else logging
.INFO
114 format
="%(asctime)s %(levelname)s: GRPC-CLI-CLIENT: %(name)s %(message)s",
120 c
= GRPCClient(args
.server
, args
.port
)
122 for action
in next_action(args
.actions
):
123 action
= action
.casefold()
124 logging
.info("GOT ACTION: %s", action
)
125 if action
== "getcap":
126 caps
= c
.get_capabilities()
127 print("Capabilities:", caps
)
128 elif action
.startswith("get,"):
129 # Print Interface State and Config
130 _
, xpath
= action
.split(",", 1)
131 print("Get XPath: ", xpath
)
133 print("{}: {}".format(xpath
, xml
))
134 # for _ in range(0, 1):
137 if __name__
== "__main__":