2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
18 package org
.apache
.arrow
.flight
;
20 import java
.io
.ByteArrayOutputStream
;
21 import java
.io
.IOException
;
22 import java
.net
.URISyntaxException
;
23 import java
.nio
.ByteBuffer
;
24 import java
.nio
.channels
.Channels
;
25 import java
.util
.ArrayList
;
26 import java
.util
.List
;
27 import java
.util
.Objects
;
28 import java
.util
.stream
.Collectors
;
30 import org
.apache
.arrow
.flight
.impl
.Flight
;
31 import org
.apache
.arrow
.vector
.ipc
.ReadChannel
;
32 import org
.apache
.arrow
.vector
.ipc
.WriteChannel
;
33 import org
.apache
.arrow
.vector
.ipc
.message
.IpcOption
;
34 import org
.apache
.arrow
.vector
.ipc
.message
.MessageSerializer
;
35 import org
.apache
.arrow
.vector
.types
.pojo
.Schema
;
36 import org
.apache
.arrow
.vector
.validate
.MetadataV4UnionChecker
;
38 import com
.fasterxml
.jackson
.databind
.util
.ByteBufferBackedInputStream
;
39 import com
.google
.common
.collect
.ImmutableList
;
40 import com
.google
.protobuf
.ByteString
;
43 * A POJO representation of a FlightInfo, metadata associated with a set of data records.
45 public class FlightInfo
{
46 private final Schema schema
;
47 private final FlightDescriptor descriptor
;
48 private final List
<FlightEndpoint
> endpoints
;
49 private final long bytes
;
50 private final long records
;
51 private final IpcOption option
;
54 * Constructs a new instance.
56 * @param schema The schema of the Flight
57 * @param descriptor An identifier for the Flight.
58 * @param endpoints A list of endpoints that have the flight available.
59 * @param bytes The number of bytes in the flight
60 * @param records The number of records in the flight.
62 public FlightInfo(Schema schema
, FlightDescriptor descriptor
, List
<FlightEndpoint
> endpoints
, long bytes
,
64 this(schema
, descriptor
, endpoints
, bytes
, records
, IpcOption
.DEFAULT
);
68 * Constructs a new instance.
70 * @param schema The schema of the Flight
71 * @param descriptor An identifier for the Flight.
72 * @param endpoints A list of endpoints that have the flight available.
73 * @param bytes The number of bytes in the flight
74 * @param records The number of records in the flight.
75 * @param option IPC write options.
77 public FlightInfo(Schema schema
, FlightDescriptor descriptor
, List
<FlightEndpoint
> endpoints
, long bytes
,
78 long records
, IpcOption option
) {
79 Objects
.requireNonNull(schema
);
80 Objects
.requireNonNull(descriptor
);
81 Objects
.requireNonNull(endpoints
);
82 MetadataV4UnionChecker
.checkForUnion(schema
.getFields().iterator(), option
.metadataVersion
);
84 this.descriptor
= descriptor
;
85 this.endpoints
= endpoints
;
87 this.records
= records
;
92 * Constructs from the protocol buffer representation.
94 FlightInfo(Flight
.FlightInfo pbFlightInfo
) throws URISyntaxException
{
96 final ByteBuffer schemaBuf
= pbFlightInfo
.getSchema().asReadOnlyByteBuffer();
97 schema
= pbFlightInfo
.getSchema().size() > 0 ?
98 MessageSerializer
.deserializeSchema(
99 new ReadChannel(Channels
.newChannel(new ByteBufferBackedInputStream(schemaBuf
))))
100 : new Schema(ImmutableList
.of());
101 } catch (IOException e
) {
102 throw new RuntimeException(e
);
104 descriptor
= new FlightDescriptor(pbFlightInfo
.getFlightDescriptor());
105 endpoints
= new ArrayList
<>();
106 for (final Flight
.FlightEndpoint endpoint
: pbFlightInfo
.getEndpointList()) {
107 endpoints
.add(new FlightEndpoint(endpoint
));
109 bytes
= pbFlightInfo
.getTotalBytes();
110 records
= pbFlightInfo
.getTotalRecords();
111 option
= IpcOption
.DEFAULT
;
114 public Schema
getSchema() {
118 public long getBytes() {
122 public long getRecords() {
126 public FlightDescriptor
getDescriptor() {
130 public List
<FlightEndpoint
> getEndpoints() {
135 * Converts to the protocol buffer representation.
137 Flight
.FlightInfo
toProtocol() {
138 // Encode schema in a Message payload
139 ByteArrayOutputStream baos
= new ByteArrayOutputStream();
141 MessageSerializer
.serialize(new WriteChannel(Channels
.newChannel(baos
)), schema
, option
);
142 } catch (IOException e
) {
143 throw new RuntimeException(e
);
145 return Flight
.FlightInfo
.newBuilder()
146 .addAllEndpoint(endpoints
.stream().map(t
-> t
.toProtocol()).collect(Collectors
.toList()))
147 .setSchema(ByteString
.copyFrom(baos
.toByteArray()))
148 .setFlightDescriptor(descriptor
.toProtocol())
149 .setTotalBytes(FlightInfo
.this.bytes
)
150 .setTotalRecords(records
)
155 * Get the serialized form of this protocol message.
157 * <p>Intended to help interoperability by allowing non-Flight services to still return Flight types.
159 public ByteBuffer
serialize() {
160 return ByteBuffer
.wrap(toProtocol().toByteArray());
164 * Parse the serialized form of this protocol message.
166 * <p>Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services.
168 * @param serialized The serialized form of the FlightInfo, as returned by {@link #serialize()}.
169 * @return The deserialized FlightInfo.
170 * @throws IOException if the serialized form is invalid.
171 * @throws URISyntaxException if the serialized form contains an unsupported URI format.
173 public static FlightInfo
deserialize(ByteBuffer serialized
) throws IOException
, URISyntaxException
{
174 return new FlightInfo(Flight
.FlightInfo
.parseFrom(serialized
));
178 public boolean equals(Object o
) {
182 if (o
== null || getClass() != o
.getClass()) {
185 FlightInfo that
= (FlightInfo
) o
;
186 return bytes
== that
.bytes
&&
187 records
== that
.records
&&
188 schema
.equals(that
.schema
) &&
189 descriptor
.equals(that
.descriptor
) &&
190 endpoints
.equals(that
.endpoints
);
194 public int hashCode() {
195 return Objects
.hash(schema
, descriptor
, endpoints
, bytes
, records
);
199 public String
toString() {
200 return "FlightInfo{" +
202 ", descriptor=" + descriptor
+
203 ", endpoints=" + endpoints
+
205 ", records=" + records
+