]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
import quincy 17.2.0
[ceph.git] / ceph / src / arrow / java / flight / flight-core / src / main / java / org / apache / arrow / flight / FlightInfo.java
diff --git a/ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java b/ceph/src/arrow/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightInfo.java
new file mode 100644 (file)
index 0000000..e57b311
--- /dev/null
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.flight;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import org.apache.arrow.flight.impl.Flight;
+import org.apache.arrow.vector.ipc.ReadChannel;
+import org.apache.arrow.vector.ipc.WriteChannel;
+import org.apache.arrow.vector.ipc.message.IpcOption;
+import org.apache.arrow.vector.ipc.message.MessageSerializer;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.validate.MetadataV4UnionChecker;
+
+import com.fasterxml.jackson.databind.util.ByteBufferBackedInputStream;
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+
+/**
+ * A POJO representation of a FlightInfo, metadata associated with a set of data records.
+ */
+public class FlightInfo {
+  private final Schema schema;
+  private final FlightDescriptor descriptor;
+  private final List<FlightEndpoint> endpoints;
+  private final long bytes;
+  private final long records;
+  private final IpcOption option;
+
+  /**
+   * Constructs a new instance.
+   *
+   * @param schema The schema of the Flight
+   * @param descriptor An identifier for the Flight.
+   * @param endpoints A list of endpoints that have the flight available.
+   * @param bytes The number of bytes in the flight
+   * @param records The number of records in the flight.
+   */
+  public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
+      long records) {
+    this(schema, descriptor, endpoints, bytes, records, IpcOption.DEFAULT);
+  }
+
+  /**
+   * Constructs a new instance.
+   *
+   * @param schema The schema of the Flight
+   * @param descriptor An identifier for the Flight.
+   * @param endpoints A list of endpoints that have the flight available.
+   * @param bytes The number of bytes in the flight
+   * @param records The number of records in the flight.
+   * @param option IPC write options.
+   */
+  public FlightInfo(Schema schema, FlightDescriptor descriptor, List<FlightEndpoint> endpoints, long bytes,
+                    long records, IpcOption option) {
+    Objects.requireNonNull(schema);
+    Objects.requireNonNull(descriptor);
+    Objects.requireNonNull(endpoints);
+    MetadataV4UnionChecker.checkForUnion(schema.getFields().iterator(), option.metadataVersion);
+    this.schema = schema;
+    this.descriptor = descriptor;
+    this.endpoints = endpoints;
+    this.bytes = bytes;
+    this.records = records;
+    this.option = option;
+  }
+
+  /**
+   * Constructs from the protocol buffer representation.
+   */
+  FlightInfo(Flight.FlightInfo pbFlightInfo) throws URISyntaxException {
+    try {
+      final ByteBuffer schemaBuf = pbFlightInfo.getSchema().asReadOnlyByteBuffer();
+      schema = pbFlightInfo.getSchema().size() > 0 ?
+          MessageSerializer.deserializeSchema(
+              new ReadChannel(Channels.newChannel(new ByteBufferBackedInputStream(schemaBuf))))
+          : new Schema(ImmutableList.of());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    descriptor = new FlightDescriptor(pbFlightInfo.getFlightDescriptor());
+    endpoints = new ArrayList<>();
+    for (final Flight.FlightEndpoint endpoint : pbFlightInfo.getEndpointList()) {
+      endpoints.add(new FlightEndpoint(endpoint));
+    }
+    bytes = pbFlightInfo.getTotalBytes();
+    records = pbFlightInfo.getTotalRecords();
+    option = IpcOption.DEFAULT;
+  }
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public long getBytes() {
+    return bytes;
+  }
+
+  public long getRecords() {
+    return records;
+  }
+
+  public FlightDescriptor getDescriptor() {
+    return descriptor;
+  }
+
+  public List<FlightEndpoint> getEndpoints() {
+    return endpoints;
+  }
+
+  /**
+   * Converts to the protocol buffer representation.
+   */
+  Flight.FlightInfo toProtocol() {
+    // Encode schema in a Message payload
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try {
+      MessageSerializer.serialize(new WriteChannel(Channels.newChannel(baos)), schema, option);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return Flight.FlightInfo.newBuilder()
+        .addAllEndpoint(endpoints.stream().map(t -> t.toProtocol()).collect(Collectors.toList()))
+        .setSchema(ByteString.copyFrom(baos.toByteArray()))
+        .setFlightDescriptor(descriptor.toProtocol())
+        .setTotalBytes(FlightInfo.this.bytes)
+        .setTotalRecords(records)
+        .build();
+  }
+
+  /**
+   * Get the serialized form of this protocol message.
+   *
+   * <p>Intended to help interoperability by allowing non-Flight services to still return Flight types.
+   */
+  public ByteBuffer serialize() {
+    return ByteBuffer.wrap(toProtocol().toByteArray());
+  }
+
+  /**
+   * Parse the serialized form of this protocol message.
+   *
+   * <p>Intended to help interoperability by allowing Flight clients to obtain stream info from non-Flight services.
+   *
+   * @param serialized The serialized form of the FlightInfo, as returned by {@link #serialize()}.
+   * @return The deserialized FlightInfo.
+   * @throws IOException if the serialized form is invalid.
+   * @throws URISyntaxException if the serialized form contains an unsupported URI format.
+   */
+  public static FlightInfo deserialize(ByteBuffer serialized) throws IOException, URISyntaxException {
+    return new FlightInfo(Flight.FlightInfo.parseFrom(serialized));
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    FlightInfo that = (FlightInfo) o;
+    return bytes == that.bytes &&
+        records == that.records &&
+        schema.equals(that.schema) &&
+        descriptor.equals(that.descriptor) &&
+        endpoints.equals(that.endpoints);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(schema, descriptor, endpoints, bytes, records);
+  }
+
+  @Override
+  public String toString() {
+    return "FlightInfo{" +
+        "schema=" + schema +
+        ", descriptor=" + descriptor +
+        ", endpoints=" + endpoints +
+        ", bytes=" + bytes +
+        ", records=" + records +
+        '}';
+  }
+}