]>
Commit | Line | Data |
---|---|---|
1d09f67e TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one or more | |
3 | * contributor license agreements. See the NOTICE file distributed with | |
4 | * this work for additional information regarding copyright ownership. | |
5 | * The ASF licenses this file to You under the Apache License, Version 2.0 | |
6 | * (the "License"); you may not use this file except in compliance with | |
7 | * the License. You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | */ | |
17 | ||
18 | package org.apache.arrow.flight; | |
19 | ||
20 | import java.io.ByteArrayOutputStream; | |
21 | import java.io.IOException; | |
22 | import java.net.URISyntaxException; | |
23 | import java.nio.ByteBuffer; | |
24 | import java.nio.channels.Channels; | |
25 | import java.util.ArrayList; | |
26 | import java.util.List; | |
27 | import java.util.Objects; | |
28 | import java.util.stream.Collectors; | |
29 | ||
30 | import org.apache.arrow.flight.impl.Flight; | |
31 | import org.apache.arrow.vector.ipc.ReadChannel; | |
32 | import org.apache.arrow.vector.ipc.WriteChannel; | |
33 | import org.apache.arrow.vector.ipc.message.IpcOption; | |
34 | import org.apache.arrow.vector.ipc.message.MessageSerializer; | |
35 | import org.apache.arrow.vector.types.pojo.Schema; | |
36 | import org.apache.arrow.vector.validate.MetadataV4UnionChecker; | |
37 | ||
38 | import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream; | |
39 | import com.google.common.collect.ImmutableList; | |
40 | import com.google.protobuf.ByteString; | |
41 | ||
42 | /** | |
43 | * A POJO representation of a FlightInfo, metadata associated with a set of data records. | |
44 | */ | |
45 | public class FlightInfo { | |
46 | private final Schema schema; | |
47 | private final FlightDescriptor descriptor; | |
48 | private final List<FlightEndpoint> endpoints; | |
49 | private final long bytes; | |
50 | private final long records; | |
51 | private final IpcOption option; | |
52 | ||
53 | /** | |
54 | * Constructs a new instance. | |
55 | * | |
56 | * @param schema The schema of the Flight | |
57 | * @param descriptor An identifier for the Flight. | |
58 | * @param endpoints A list of endpoints that have the flight available. | |
59 | * @param bytes The number of bytes in the flight | |
60 | * @param records The number of records in the flight. | |
61 | */ | |
62 | public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes, | |
63 | long records) { | |
64 | this(schema, descriptor, endpoints, bytes, records, IpcOption.DEFAULT); | |
65 | } | |
66 | ||
67 | /** | |
68 | * Constructs a new instance. | |
69 | * | |
70 | * @param schema The schema of the Flight | |
71 | * @param descriptor An identifier for the Flight. | |
72 | * @param endpoints A list of endpoints that have the flight available. | |
73 | * @param bytes The number of bytes in the flight | |
74 | * @param records The number of records in the flight. | |
75 | * @param option IPC write options. | |
76 | */ | |
77 | public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes, | |
78 | long records, IpcOption option) { | |
79 | Objects.requireNonNull(schema); | |
80 | Objects.requireNonNull(descriptor); | |
81 | Objects.requireNonNull(endpoints); | |
82 | MetadataV4UnionChecker.checkForUnion(schema.getFields().iterator(), option.metadataVersion); | |
83 | this.schema = schema; | |
84 | this.descriptor = descriptor; | |
85 | this.endpoints = endpoints; | |
86 | this.bytes = bytes; | |
87 | this.records = records; | |
88 | this.option = option; | |
89 | } | |
90 | ||
91 | /** | |
92 | * Constructs from the protocol buffer representation. | |
93 | */ | |
94 | FlightInfo(Flight.FlightInfo pbFlightInfo) throws URISyntaxException { | |
95 | try { | |
96 | final ByteBuffer schemaBuf = pbFlightInfo.getSchema().asReadOnlyByteBuffer(); | |
97 | schema = pbFlightInfo.getSchema().size() > 0 ? | |
98 | MessageSerializer.deserializeSchema( | |
99 | new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf)))) | |
100 | : new Schema(ImmutableList.of()); | |
101 | } catch (IOException e) { | |
102 | throw new RuntimeException(e); | |
103 | } | |
104 | descriptor = new FlightDescriptor(pbFlightInfo.getFlightDescriptor()); | |
105 | endpoints = new ArrayList<>(); | |
106 | for (final Flight.FlightEndpoint endpoint : pbFlightInfo.getEndpointList()) { | |
107 | endpoints.add(new FlightEndpoint(endpoint)); | |
108 | } | |
109 | bytes = pbFlightInfo.getTotalBytes(); | |
110 | records = pbFlightInfo.getTotalRecords(); | |
111 | option = IpcOption.DEFAULT; | |
112 | } | |
113 | ||
114 | public Schema getSchema() { | |
115 | return schema; | |
116 | } | |
117 | ||
118 | public long getBytes() { | |
119 | return bytes; | |
120 | } | |
121 | ||
122 | public long getRecords() { | |
123 | return records; | |
124 | } | |
125 | ||
126 | public FlightDescriptor getDescriptor() { | |
127 | return descriptor; | |
128 | } | |
129 | ||
130 | public List<FlightEndpoint> getEndpoints() { | |
131 | return endpoints; | |
132 | } | |
133 | ||
134 | /** | |
135 | * Converts to the protocol buffer representation. | |
136 | */ | |
137 | Flight.FlightInfo toProtocol() { | |
138 | // Encode schema in a Message payload | |
139 | ByteArrayOutputStream baos = new ByteArrayOutputStream(); | |
140 | try { | |
141 | MessageSerializer.serialize(new WriteChannel(Channels.newChannel(baos)), schema, option); | |
142 | } catch (IOException e) { | |
143 | throw new RuntimeException(e); | |
144 | } | |
145 | return Flight.FlightInfo.newBuilder() | |
146 | .addAllEndpoint(endpoints.stream().map(t -> t.toProtocol()).collect(Collectors.toList())) | |
147 | .setSchema(ByteString.copyFrom(baos.toByteArray())) | |
148 | .setFlightDescriptor(descriptor.toProtocol()) | |
149 | .setTotalBytes(FlightInfo.this.bytes) | |
150 | .setTotalRecords(records) | |
151 | .build(); | |
152 | } | |
153 | ||
154 | /** | |
155 | * Get the serialized form of this protocol message. | |
156 | * | |
157 | * <p>Intended to help interoperability by allowing non-Flight services to still return Flight types. | |
158 | */ | |
159 | public ByteBuffer serialize() { | |
160 | return ByteBuffer.wrap(toProtocol().toByteArray()); | |
161 | } | |
162 | ||
163 | /** | |
164 | * Parse the serialized form of this protocol message. | |
165 | * | |
166 | * <p>Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services. | |
167 | * | |
168 | * @param serialized The serialized form of the FlightInfo, as returned by {@link #serialize()}. | |
169 | * @return The deserialized FlightInfo. | |
170 | * @throws IOException if the serialized form is invalid. | |
171 | * @throws URISyntaxException if the serialized form contains an unsupported URI format. | |
172 | */ | |
173 | public static FlightInfo deserialize(ByteBuffer serialized) throws IOException, URISyntaxException { | |
174 | return new FlightInfo(Flight.FlightInfo.parseFrom(serialized)); | |
175 | } | |
176 | ||
177 | @Override | |
178 | public boolean equals(Object o) { | |
179 | if (this == o) { | |
180 | return true; | |
181 | } | |
182 | if (o == null || getClass() != o.getClass()) { | |
183 | return false; | |
184 | } | |
185 | FlightInfo that = (FlightInfo) o; | |
186 | return bytes == that.bytes && | |
187 | records == that.records && | |
188 | schema.equals(that.schema) && | |
189 | descriptor.equals(that.descriptor) && | |
190 | endpoints.equals(that.endpoints); | |
191 | } | |
192 | ||
193 | @Override | |
194 | public int hashCode() { | |
195 | return Objects.hash(schema, descriptor, endpoints, bytes, records); | |
196 | } | |
197 | ||
198 | @Override | |
199 | public String toString() { | |
200 | return "FlightInfo{" + | |
201 | "schema=" + schema + | |
202 | ", descriptor=" + descriptor + | |
203 | ", endpoints=" + endpoints + | |
204 | ", bytes=" + bytes + | |
205 | ", records=" + records + | |
206 | '}'; | |
207 | } | |
208 | } |