]>
git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/r/inst/demo_flight_server.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
19 An example Flight Python server.
20 See https://github.com/apache/arrow/blob/master/python/examples/flight/server.py
31 class DemoFlightServer(pyarrow
.flight
.FlightServerBase
):
32 def __init__(self
, host
="localhost", port
=5005):
33 if isinstance(port
, float):
34 # Because R is looser with integer vs. float
36 location
= "grpc+tcp://{}:{}".format(host
, port
)
37 super(DemoFlightServer
, self
).__init
__(location
)
42 def descriptor_to_key(self
, descriptor
):
43 return (descriptor
.descriptor_type
.value
, descriptor
.command
,
44 tuple(descriptor
.path
or tuple()))
46 def _make_flight_info(self
, key
, descriptor
, table
):
47 location
= pyarrow
.flight
.Location
.for_grpc_tcp(self
.host
, self
.port
)
48 endpoints
= [pyarrow
.flight
.FlightEndpoint(repr(key
), [location
]), ]
50 mock_sink
= pyarrow
.MockOutputStream()
51 stream_writer
= pyarrow
.RecordBatchStreamWriter(
52 mock_sink
, table
.schema
)
53 stream_writer
.write_table(table
)
55 data_size
= mock_sink
.size()
57 return pyarrow
.flight
.FlightInfo(table
.schema
,
58 descriptor
, endpoints
,
59 table
.num_rows
, data_size
)
61 def list_flights(self
, context
, criteria
):
63 for key
, table
in self
.flights
.items():
64 if key
[1] is not None:
66 pyarrow
.flight
.FlightDescriptor
.for_command(key
[1])
68 descriptor
= pyarrow
.flight
.FlightDescriptor
.for_path(*key
[2])
70 yield self
._make
_flight
_info
(key
, descriptor
, table
)
72 def get_flight_info(self
, context
, descriptor
):
73 print("get_flight_info")
74 key
= DemoFlightServer
.descriptor_to_key(descriptor
)
75 if key
in self
.flights
:
76 table
= self
.flights
[key
]
77 return self
._make
_flight
_info
(key
, descriptor
, table
)
78 raise KeyError('Flight not found.')
80 def do_put(self
, context
, descriptor
, reader
, writer
):
82 key
= DemoFlightServer
.descriptor_to_key(descriptor
)
84 self
.flights
[key
] = reader
.read_all()
85 print(self
.flights
[key
])
87 def do_get(self
, context
, ticket
):
89 key
= ast
.literal_eval(ticket
.ticket
.decode())
90 if key
not in self
.flights
:
92 return pyarrow
.flight
.RecordBatchStream(self
.flights
[key
])
94 def list_actions(self
, context
):
97 ("clear", "Clear the stored flights."),
98 ("shutdown", "Shut down this server."),
101 def do_action(self
, context
, action
):
103 if action
.type == "clear":
104 raise NotImplementedError(
105 "{} is not implemented.".format(action
.type))
106 elif action
.type == "healthcheck":
108 elif action
.type == "shutdown":
109 yield pyarrow
.flight
.Result(pyarrow
.py_buffer(b
'Shutdown!'))
110 # Shut down on background thread to avoid blocking current
112 threading
.Thread(target
=self
._shutdown
).start()
114 raise KeyError("Unknown action {!r}".format(action
.type))
117 """Shut down after a delay."""
118 print("Server is shutting down...")