]> git.proxmox.com Git - ceph.git/blob - ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / main / java / org / apache / arrow / flight / FlightInfo.java
1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.arrow.flight;
19
20 import java.io.ByteArrayOutputStream;
21 import java.io.IOException;
22 import java.net.URISyntaxException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.Channels;
25 import java.util.ArrayList;
26 import java.util.List;
27 import java.util.Objects;
28 import java.util.stream.Collectors;
29
30 import org.apache.arrow.flight.impl.Flight;
31 import org.apache.arrow.vector.ipc.ReadChannel;
32 import org.apache.arrow.vector.ipc.WriteChannel;
33 import org.apache.arrow.vector.ipc.message.IpcOption;
34 import org.apache.arrow.vector.ipc.message.MessageSerializer;
35 import org.apache.arrow.vector.types.pojo.Schema;
36 import org.apache.arrow.vector.validate.MetadataV4UnionChecker;
37
38 import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
39 import com.google.common.collect.ImmutableList;
40 import com.google.protobuf.ByteString;
41
42 /**
43 * A POJO representation of a FlightInfo, metadata associated with a set of data records.
44 */
45 public class FlightInfo {
46 private final Schema schema;
47 private final FlightDescriptor descriptor;
48 private final List<FlightEndpoint> endpoints;
49 private final long bytes;
50 private final long records;
51 private final IpcOption option;
52
53 /**
54 * Constructs a new instance.
55 *
56 * @param schema The schema of the Flight
57 * @param descriptor An identifier for the Flight.
58 * @param endpoints A list of endpoints that have the flight available.
59 * @param bytes The number of bytes in the flight
60 * @param records The number of records in the flight.
61 */
62 public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
63 long records) {
64 this(schema, descriptor, endpoints, bytes, records, IpcOption.DEFAULT);
65 }
66
67 /**
68 * Constructs a new instance.
69 *
70 * @param schema The schema of the Flight
71 * @param descriptor An identifier for the Flight.
72 * @param endpoints A list of endpoints that have the flight available.
73 * @param bytes The number of bytes in the flight
74 * @param records The number of records in the flight.
75 * @param option IPC write options.
76 */
77 public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
78 long records, IpcOption option) {
79 Objects.requireNonNull(schema);
80 Objects.requireNonNull(descriptor);
81 Objects.requireNonNull(endpoints);
82 MetadataV4UnionChecker.checkForUnion(schema.getFields().iterator(), option.metadataVersion);
83 this.schema = schema;
84 this.descriptor = descriptor;
85 this.endpoints = endpoints;
86 this.bytes = bytes;
87 this.records = records;
88 this.option = option;
89 }
90
91 /**
92 * Constructs from the protocol buffer representation.
93 */
94 FlightInfo(Flight.FlightInfo pbFlightInfo) throws URISyntaxException {
95 try {
96 final ByteBuffer schemaBuf = pbFlightInfo.getSchema().asReadOnlyByteBuffer();
97 schema = pbFlightInfo.getSchema().size() > 0 ?
98 MessageSerializer.deserializeSchema(
99 new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf))))
100 : new Schema(ImmutableList.of());
101 } catch (IOException e) {
102 throw new RuntimeException(e);
103 }
104 descriptor = new FlightDescriptor(pbFlightInfo.getFlightDescriptor());
105 endpoints = new ArrayList<>();
106 for (final Flight.FlightEndpoint endpoint : pbFlightInfo.getEndpointList()) {
107 endpoints.add(new FlightEndpoint(endpoint));
108 }
109 bytes = pbFlightInfo.getTotalBytes();
110 records = pbFlightInfo.getTotalRecords();
111 option = IpcOption.DEFAULT;
112 }
113
114 public Schema getSchema() {
115 return schema;
116 }
117
118 public long getBytes() {
119 return bytes;
120 }
121
122 public long getRecords() {
123 return records;
124 }
125
126 public FlightDescriptor getDescriptor() {
127 return descriptor;
128 }
129
130 public List<FlightEndpoint> getEndpoints() {
131 return endpoints;
132 }
133
134 /**
135 * Converts to the protocol buffer representation.
136 */
137 Flight.FlightInfo toProtocol() {
138 // Encode schema in a Message payload
139 ByteArrayOutputStream baos = new ByteArrayOutputStream();
140 try {
141 MessageSerializer.serialize(new WriteChannel(Channels.newChannel(baos)), schema, option);
142 } catch (IOException e) {
143 throw new RuntimeException(e);
144 }
145 return Flight.FlightInfo.newBuilder()
146 .addAllEndpoint(endpoints.stream().map(t -> t.toProtocol()).collect(Collectors.toList()))
147 .setSchema(ByteString.copyFrom(baos.toByteArray()))
148 .setFlightDescriptor(descriptor.toProtocol())
149 .setTotalBytes(FlightInfo.this.bytes)
150 .setTotalRecords(records)
151 .build();
152 }
153
154 /**
155 * Get the serialized form of this protocol message.
156 *
157 * <p>Intended to help interoperability by allowing non-Flight services to still return Flight types.
158 */
159 public ByteBuffer serialize() {
160 return ByteBuffer.wrap(toProtocol().toByteArray());
161 }
162
163 /**
164 * Parse the serialized form of this protocol message.
165 *
166 * <p>Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services.
167 *
168 * @param serialized The serialized form of the FlightInfo, as returned by {@link #serialize()}.
169 * @return The deserialized FlightInfo.
170 * @throws IOException if the serialized form is invalid.
171 * @throws URISyntaxException if the serialized form contains an unsupported URI format.
172 */
173 public static FlightInfo deserialize(ByteBuffer serialized) throws IOException, URISyntaxException {
174 return new FlightInfo(Flight.FlightInfo.parseFrom(serialized));
175 }
176
177 @Override
178 public boolean equals(Object o) {
179 if (this == o) {
180 return true;
181 }
182 if (o == null || getClass() != o.getClass()) {
183 return false;
184 }
185 FlightInfo that = (FlightInfo) o;
186 return bytes == that.bytes &&
187 records == that.records &&
188 schema.equals(that.schema) &&
189 descriptor.equals(that.descriptor) &&
190 endpoints.equals(that.endpoints);
191 }
192
193 @Override
194 public int hashCode() {
195 return Objects.hash(schema, descriptor, endpoints, bytes, records);
196 }
197
198 @Override
199 public String toString() {
200 return "FlightInfo{" +
201 "schema=" + schema +
202 ", descriptor=" + descriptor +
203 ", endpoints=" + endpoints +
204 ", bytes=" + bytes +
205 ", records=" + records +
206 '}';
207 }
208 }