]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/r/inst/demo_flight_server.py
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / r / inst / demo_flight_server.py
1 # Licensed to the Apache Software Foundation (ASF) under one
2 # or more contributor license agreements. See the NOTICE file
3 # distributed with this work for additional information
4 # regarding copyright ownership. The ASF licenses this file
5 # to you under the Apache License, Version 2.0 (the
6 # "License"); you may not use this file except in compliance
7 # with the License. You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing,
12 # software distributed under the License is distributed on an
13 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 # KIND, either express or implied. See the License for the
15 # specific language governing permissions and limitations
16 # under the License.
17
18 """
19 An example Flight Python server.
20 See https://github.com/apache/arrow/blob/master/python/examples/flight/server.py
21 """
22
23 import ast
24 import threading
25 import time
26
27 import pyarrow
28 import pyarrow.flight
29
30
31 class DemoFlightServer(pyarrow.flight.FlightServerBase):
32 def __init__(self, host="localhost", port=5005):
33 if isinstance(port, float):
34 # Because R is looser with integer vs. float
35 port = int(port)
36 location = "grpc+tcp://{}:{}".format(host, port)
37 super(DemoFlightServer, self).__init__(location)
38 self.flights = {}
39 self.host = host
40
41 @classmethod
42 def descriptor_to_key(self, descriptor):
43 return (descriptor.descriptor_type.value, descriptor.command,
44 tuple(descriptor.path or tuple()))
45
46 def _make_flight_info(self, key, descriptor, table):
47 location = pyarrow.flight.Location.for_grpc_tcp(self.host, self.port)
48 endpoints = [pyarrow.flight.FlightEndpoint(repr(key), [location]), ]
49
50 mock_sink = pyarrow.MockOutputStream()
51 stream_writer = pyarrow.RecordBatchStreamWriter(
52 mock_sink, table.schema)
53 stream_writer.write_table(table)
54 stream_writer.close()
55 data_size = mock_sink.size()
56
57 return pyarrow.flight.FlightInfo(table.schema,
58 descriptor, endpoints,
59 table.num_rows, data_size)
60
61 def list_flights(self, context, criteria):
62 print("list_flights")
63 for key, table in self.flights.items():
64 if key[1] is not None:
65 descriptor = \
66 pyarrow.flight.FlightDescriptor.for_command(key[1])
67 else:
68 descriptor = pyarrow.flight.FlightDescriptor.for_path(*key[2])
69
70 yield self._make_flight_info(key, descriptor, table)
71
72 def get_flight_info(self, context, descriptor):
73 print("get_flight_info")
74 key = DemoFlightServer.descriptor_to_key(descriptor)
75 if key in self.flights:
76 table = self.flights[key]
77 return self._make_flight_info(key, descriptor, table)
78 raise KeyError('Flight not found.')
79
80 def do_put(self, context, descriptor, reader, writer):
81 print("do_put")
82 key = DemoFlightServer.descriptor_to_key(descriptor)
83 print(key)
84 self.flights[key] = reader.read_all()
85 print(self.flights[key])
86
87 def do_get(self, context, ticket):
88 print("do_get")
89 key = ast.literal_eval(ticket.ticket.decode())
90 if key not in self.flights:
91 return None
92 return pyarrow.flight.RecordBatchStream(self.flights[key])
93
94 def list_actions(self, context):
95 print("list_actions")
96 return [
97 ("clear", "Clear the stored flights."),
98 ("shutdown", "Shut down this server."),
99 ]
100
101 def do_action(self, context, action):
102 print("do_action")
103 if action.type == "clear":
104 raise NotImplementedError(
105 "{} is not implemented.".format(action.type))
106 elif action.type == "healthcheck":
107 pass
108 elif action.type == "shutdown":
109 yield pyarrow.flight.Result(pyarrow.py_buffer(b'Shutdown!'))
110 # Shut down on background thread to avoid blocking current
111 # request
112 threading.Thread(target=self._shutdown).start()
113 else:
114 raise KeyError("Unknown action {!r}".format(action.type))
115
116 def _shutdown(self):
117 """Shut down after a delay."""
118 print("Server is shutting down...")
119 time.sleep(2)
120 self.shutdown()